import re import requests import json import pandas as pd import numpy as np from datetime import datetime from requests.auth import HTTPBasicAuth from sqlalchemy import create_engine from airflow import DAG from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, } def get_db_engine(): """Создает подключение к PostgreSQL""" DF_CONFIG = { 'dbname': "postgres", 'user': "postgres", 'password': "4a00d4b90cd830da0796", 'host': "postgresql", 'port': "5432" } return create_engine( f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", pool_size=10, max_overflow=20 ) def upsert_list_fin_portfel(**kwargs): engine = get_db_engine() query = """ select distinct osv.schet as schet , osv.subkonto2 as subkonto2 , null as summa_dogovora , null as percent_value from public.oborotno_salbdovaya_vedomostb osv where osv.subkonto2 not in (select dst2.subkonto2 from public.dict_subkonto_sec dst2) and (osv.schet like '%%01%%' or osv.schet like '%%03%%') """ df = pd.read_sql(query, engine) with engine.begin() as conn: if not df.empty: conn.execute("CREATE TEMP TABLE temp_fin_portfel as SELECT * FROM public.fin_portfel WHERE 1=0") df.to_sql('temp_fin_portfel', con=conn, if_exists='append', index=False, method='multi') conn.execute (""" INSERT INTO public.fin_portfel SELECT schet , subkonto2 , summa_dogovora , percent_value FROM temp_fin_portfel """) return 'Список обновлен.' else: return 'Обновлять нечего.' with DAG( dag_id='update_fin_portfel', default_args=default_args, description='Обновление списка Финансового портфеля', schedule_interval=None, catchup=False, tags=['sigma'], ) as dag: upsert_list_fin_portfel_task = PythonOperator( task_id='upsert_list_fin_portfel', python_callable='upsert_list_fin_portfel', provide_context=True ) upsert_list_fin_portfel_task