import re import requests import json import pandas as pd import numpy as np from datetime import datetime from requests.auth import HTTPBasicAuth from sqlalchemy import create_engine from airflow import DAG from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, } def get_db_engine(): """Создает подключение к PostgreSQL""" DF_CONFIG = { 'dbname': "postgres", 'user': "postgres", 'password': "4a00d4b90cd830da0796", 'host': "postgresql", 'port': "5432" } return create_engine( f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", pool_size=10, max_overflow=20 ) def upsert_list_fin_portfel(**kwargs): engine = get_db_engine() query = """ select distinct osv.uid_subkonto2 uid_dogovor , osv.schet as schet , osv.subkonto2 as name , null as summa_dogovora , null as percent_value from public.oborotno_salbdovaya_vedomostb osv left join public.fin_porfel fp on fp.schet = osv.schet and fp.uid_dogovor = osv.uid_subkonto2 where (osv.schet like '%%01%%' or osv.schet like '%%03%%') and fp.uid_dogovor is null """ df = pd.read_sql(query, engine) with engine.begin() as conn: if not df.empty: conn.execute("CREATE TEMP TABLE temp_fin_porfel (uid_dogovor text null, schet text null, name text null, summa_dogovora text null, percent_value text null)") df.to_sql('temp_fin_porfel', con=conn, if_exists='append', index=False, method='multi') conn.execute(""" INSERT INTO public.fin_porfel (uid_dogovor, schet, name, summa_dogovora, percent_value) SELECT DISTINCT uid_dogovor , schet , name , summa_dogovora , percent_value FROM temp_fin_porfel ON CONFLICT (uid_dogovor, schet) DO UPDATE SET name = EXCLUDED.name, summa_dogovora = EXCLUDED.summa_dogovora, percent_value = EXCLUDED.percent_value """ ) conn.execute(""" UPDATE public.fin_porfel fp SET id = subquery.new_id FROM ( SELECT id, ROW_NUMBER() OVER (ORDER BY uid_dogovor, schet, summa_dogovora) as new_id FROM public.fin_porfel ) AS subquery WHERE fp.id = subquery.id; """ ) return 'Список обновлен.' else: return 'Обновлять нечего.' with DAG( dag_id='update_fin_portfel', default_args=default_args, description='Обновление списка Финансового портфеля', schedule_interval=None, catchup=False, tags=['sigma'], ) as dag: upsert_list_fin_portfel_task = PythonOperator( task_id='upsert_list_fin_portfel', python_callable=upsert_list_fin_portfel, provide_context=True ) upsert_list_fin_portfel_task