sigma/dags/fin_porfel.py

86 lines
2.6 KiB
Python

import re
import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
def upsert_list_fin_portfel(**kwargs):
engine = get_db_engine()
query = """
select distinct
osv.schet as schet
, osv.subkonto2 as subkonto2
, null as summa_dogovora
, null as percent_value
from public.oborotno_salbdovaya_vedomostb osv
left join public.fin_porfel fp
on fp.schet = osv.schet
and fp.subkonto2 = osv.subkonto2
where (osv.schet like '%%01%%' or osv.schet like '%%03%%')
and fp.subkonto2 is null
"""
df = pd.read_sql(query, engine)
with engine.begin() as conn:
if not df.empty:
conn.execute("CREATE TEMP TABLE temp_fin_porfel (schet text null, subkonto2 text null, summa_dogovora text null, percent_value text null)")
df2 = pd.read_sql("select * from temp_fin_porfel")
return df2.to_dict(orient='records')
df.to_sql('temp_fin_porfel', con=conn, if_exists='append', index=False, method='multi')
conn.execute
("""
INSERT INTO public.fin_porfel
SELECT distinct
schet
, subkonto2
, summa_dogovora
, percent_value
FROM temp_fin_porfel
""")
return 'Список обновлен.'
else:
return 'Обновлять нечего.'
with DAG(
dag_id='update_fin_portfel',
default_args=default_args,
description='Обновление списка Финансового портфеля',
schedule_interval=None,
catchup=False,
tags=['sigma'],
) as dag:
upsert_list_fin_portfel_task = PythonOperator(
task_id='upsert_list_fin_portfel',
python_callable=upsert_list_fin_portfel,
provide_context=True
)
upsert_list_fin_portfel_task