import requests import json import pandas as pd import numpy as np from datetime import datetime from requests.auth import HTTPBasicAuth from sqlalchemy import create_engine, text from airflow import DAG from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, } def get_db_engine(): """Создает подключение к PostgreSQL""" DF_CONFIG = { 'dbname': "postgres", 'user': "postgres", 'password': "4a00d4b90cd830da0796", 'host': "postgresql", 'port': "5432" } return create_engine( f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", pool_size=10, max_overflow=20 ) def pogasheniya(**kwargs): engine = get_db_engine() query = text(""" SELECT osv.schet , osv.uid_subkonto2 as uid_dogovor , osv.subkonto2 as name , osv.nomer , osv.date_begin , osv.date_end FROM oborotno_salbdovaya_vedomostb osv LEFT JOIN pogasheniya p on p.uid_dogovor = osv.uid_subkonto2 and p.schet = osv.schet WHERE p.uid_dogovor is null and (osv.schet::text like '%03%' OR osv.schet::text like '%01%') UNION ALL SELECT '76.07.1' as schet , '00000000-0000-0000-0000-000000000000' as uid_dogovor , 'Лизинг' as name , osv.agreement_num as nomer , osv.agreement_date as date_begin , osv.redemption_date as date_end FROM lizingi_garantii osv LEFT JOIN pogasheniya p on p.nomer = osv.agreement_num WHERE p.uid_dogovor is null """) df = pd.read_sql(query, engine) with engine.begin() as conn: if not df.empty: conn.execute("CREATE TEMP TABLE temp_pogasheniya (schet text null, uid_dogovor text null, name text null, nomer text null, date_begin text null, date_end text null)") df.to_sql('temp_pogasheniya', con=conn, if_exists='append', index=False, method='multi') conn.execute(""" INSERT INTO public.pogasheniya (schet, uid_dogovor, name, nomer, date_begin, date_end) SELECT * FROM temp_pogasheniya ON CONFLICT (schet, uid_dogovor, nomer) DO UPDATE SET name = EXCLUDED.name, nomer = EXCLUDED.nomer, date_begin = EXCLUDED.date_begin, date_end = EXCLUDED.date_end """) conn.execute(""" UPDATE public.pogasheniya fp SET id = subquery.new_id FROM ( SELECT id, ROW_NUMBER() OVER (ORDER BY uid_dogovor, schet, nomer) as new_id FROM public.pogasheniya ) AS subquery WHERE fp.id = subquery.id; """ ) return 'Список обновлен.' else: return 'Обновлять нечего.' with DAG( dag_id='update_pogasheniya', default_args=default_args, description='Обновление списка погашений', schedule_interval=None, catchup=False, tags=['sigma'], ) as dag: pogasheniya_task = PythonOperator( task_id='pogasheniya', python_callable=pogasheniya, provide_context=True ) pogasheniya_task