From 54f59e8b2ad058616244e3abfc862ffa1453ea28 Mon Sep 17 00:00:00 2001 From: bn_user Date: Thu, 13 Nov 2025 20:37:26 +0000 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20dags/pogasheniya.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags/pogasheniya.py | 105 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 dags/pogasheniya.py diff --git a/dags/pogasheniya.py b/dags/pogasheniya.py new file mode 100644 index 0000000..e5f89b5 --- /dev/null +++ b/dags/pogasheniya.py @@ -0,0 +1,105 @@ + +import requests +import json +import pandas as pd +import numpy as np +from datetime import datetime +from requests.auth import HTTPBasicAuth +from sqlalchemy import create_engine +from airflow import DAG +from airflow.operators.python import PythonOperator + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2023, 1, 1), + 'retries': 1, +} + +def get_db_engine(): + """Создает подключение к PostgreSQL""" + DF_CONFIG = { + 'dbname': "postgres", + 'user': "postgres", + 'password': "4a00d4b90cd830da0796", + 'host': "postgresql", + 'port': "5432" + } + return create_engine( + f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" + f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", + pool_size=10, + max_overflow=20 + ) + +def pogasheniya(**kwargs): + engine = get_db_engine() + query = """ + SELECT + osv.schet + , osv.uid_subkonto2 uid_dogovor + , osv.name + , osv.nomer + , osv.date_begin + , osv.date_end + FROM oborotno_salbdovaya_vedomostb osv + LEFT JOIN pogasheniya p on p.uid_dogovor = osv.uid_subkonto2 and p.schet = osv.schet + WHERE p.uid_dogovor is null + UNION ALL + SELECT + '76.07.1' schet + , '00000000-0000-0000-0000-000000000000' uid_dogovor + , 'Лизинг' name + , osv.agreement_num + , osv.agreement_date + , osv.redemption_date + FROM lizingi_garantii osv + LEFT JOIN pogasheniya p on p.uid_dogovor = osv.uid_subkonto2 and p.schet = osv.schet and p.nomer = osv.agreement_num + WHERE p.uid_dogovor is null + """ + df = pd.read_sql(query, engine) + with engine.begin() as conn: + if not df.empty: + conn.execute("CREATE TEMP TABLE temp_pogasheniya (schet text null, uid_dogovor text null, name text null, nomer text null, date_begin text null, date_end text null)") + df.to_sql('temp_pogasheniya', con=conn, if_exists='append', index=False, method='multi') + conn.execute(""" + INSERT INTO public.pogasheniya (schet, uid_dogovor, name, nomer, date_begin, date_end) + SELECT * FROM temp_pogasheniya + ON CONFLICT (schet, uid_dogovor, nomer) + name = EXCLUDED.name, + agreement_num = EXCLUDED.agreement_num, + agreement_date = EXCLUDED.agreement_date, + redemption_date = EXCLUDED.redemption_date + """) + conn.execute(""" + UPDATE public.pogasheniya fp + SET id = subquery.new_id + FROM ( + SELECT + id, + ROW_NUMBER() OVER (ORDER BY uid_dogovor, schet, nomer) as new_id + FROM public.pogasheniya + ) AS subquery + WHERE fp.id = subquery.id; + """ + ) + return 'Список обновлен.' + else: + return 'Обновлять нечего.' + +with DAG( + dag_id='update_pogasheniya', + default_args=default_args, + description='Обновление списка погашений', + schedule_interval=None, + catchup=False, + tags=['sigma'], +) as dag: + + pogasheniya_task = PythonOperator( + task_id='pogasheniya', + python_callable=pogasheniya, + provide_context=True + ) + +pogasheniya_task \ No newline at end of file