From 61a4b46c9b54b37b9cb8e41a2b3ea1f57dbed653 Mon Sep 17 00:00:00 2001 From: bn_user Date: Wed, 10 Sep 2025 08:04:32 +0000 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20dags/fin=5Fporfel.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags/fin_porfel.py | 83 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 dags/fin_porfel.py diff --git a/dags/fin_porfel.py b/dags/fin_porfel.py new file mode 100644 index 0000000..fe511ca --- /dev/null +++ b/dags/fin_porfel.py @@ -0,0 +1,83 @@ +import re +import requests +import json +import pandas as pd +import numpy as np +from datetime import datetime +from requests.auth import HTTPBasicAuth +from sqlalchemy import create_engine +from airflow import DAG +from airflow.operators.python import PythonOperator + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2023, 1, 1), + 'retries': 1, +} + +def get_db_engine(): + """Создает подключение к PostgreSQL""" + DF_CONFIG = { + 'dbname': "postgres", + 'user': "postgres", + 'password': "4a00d4b90cd830da0796", + 'host': "postgresql", + 'port': "5432" + } + return create_engine( + f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" + f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", + pool_size=10, + max_overflow=20 + ) + +def upsert_list_fin_portfel(**kwargs): + engine = get_db_engine() + query = + """ + select distinct + osv.schet as schet + , osv.subkonto2 as subkonto2 + , null as summa_dogovora + , null as percent_value + from public.oborotno_salbdovaya_vedomostb osv + where osv.subkonto2 not in (select dst2.subkonto2 from public.dict_subkonto_sec dst2) + and (osv.schet like '%%01%%' or osv.schet like '%%03%%') + """ + df = pd.read_sql(query, engine) + + with engine.begin() as conn: + if not df.empty: + conn.execute("CREATE TEMP TABLE temp_fin_portfel as SELECT * FROM public.fin_portfel WHERE 1=0") + df.to_sql('temp_fin_portfel', con=conn, if_exists='append', index=False, method='multi') + conn.execute + (""" + INSERT INTO public.fin_portfel + SELECT + schet + , subkonto2 + , summa_dogovora + , percent_value + FROM temp_fin_portfel + """) + return 'Список обновлен.' + else: + return 'Обновлять нечего.' + +with DAG( + dag_id='update_fin_portfel', + default_args=default_args, + description='Обновление списка Финансового портфеля', + schedule_interval=None, + catchup=False, + tags=['sigma'], +) as dag: + + upsert_list_fin_portfel_task = PythonOperator( + task_id='upsert_list_fin_portfel', + python_callable='upsert_list_fin_portfel', + provide_context=True + ) + +upsert_list_fin_portfel_task \ No newline at end of file