diff --git a/dags/fin_porfel.py b/dags/fin_porfel.py new file mode 100644 index 0000000..fe511ca --- /dev/null +++ b/dags/fin_porfel.py @@ -0,0 +1,83 @@ +import re +import requests +import json +import pandas as pd +import numpy as np +from datetime import datetime +from requests.auth import HTTPBasicAuth +from sqlalchemy import create_engine +from airflow import DAG +from airflow.operators.python import PythonOperator + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2023, 1, 1), + 'retries': 1, +} + +def get_db_engine(): + """Создает подключение к PostgreSQL""" + DF_CONFIG = { + 'dbname': "postgres", + 'user': "postgres", + 'password': "4a00d4b90cd830da0796", + 'host': "postgresql", + 'port': "5432" + } + return create_engine( + f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" + f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", + pool_size=10, + max_overflow=20 + ) + +def upsert_list_fin_portfel(**kwargs): + engine = get_db_engine() + query = + """ + select distinct + osv.schet as schet + , osv.subkonto2 as subkonto2 + , null as summa_dogovora + , null as percent_value + from public.oborotno_salbdovaya_vedomostb osv + where osv.subkonto2 not in (select dst2.subkonto2 from public.dict_subkonto_sec dst2) + and (osv.schet like '%%01%%' or osv.schet like '%%03%%') + """ + df = pd.read_sql(query, engine) + + with engine.begin() as conn: + if not df.empty: + conn.execute("CREATE TEMP TABLE temp_fin_portfel as SELECT * FROM public.fin_portfel WHERE 1=0") + df.to_sql('temp_fin_portfel', con=conn, if_exists='append', index=False, method='multi') + conn.execute + (""" + INSERT INTO public.fin_portfel + SELECT + schet + , subkonto2 + , summa_dogovora + , percent_value + FROM temp_fin_portfel + """) + return 'Список обновлен.' + else: + return 'Обновлять нечего.' + +with DAG( + dag_id='update_fin_portfel', + default_args=default_args, + description='Обновление списка Финансового портфеля', + schedule_interval=None, + catchup=False, + tags=['sigma'], +) as dag: + + upsert_list_fin_portfel_task = PythonOperator( + task_id='upsert_list_fin_portfel', + python_callable='upsert_list_fin_portfel', + provide_context=True + ) + +upsert_list_fin_portfel_task \ No newline at end of file