import re import requests import json import pandas as pd import numpy as np from datetime import datetime from requests.auth import HTTPBasicAuth from sqlalchemy import create_engine from airflow import DAG from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, } def get_db_engine(): """Создает подключение к PostgreSQL""" DF_CONFIG = { 'dbname': "postgres", 'user': "postgres", 'password': "4a00d4b90cd830da0796", 'host': "postgresql", 'port': "5432" } return create_engine( f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", pool_size=10, max_overflow=20 ) def upsert_list_fin_portfel(**kwargs): engine = get_db_engine() query = """ select distinct osv.schet as schet , osv.subkonto2 as subkonto2 , null as summa_dogovora , null as percent_value from public.oborotno_salbdovaya_vedomostb osv left join public.fin_porfel fp on fp.schet = osv.schet and fp.subkonto2 = osv.subkonto2 where (osv.schet like '%%01%%' or osv.schet like '%%03%%') and fp.subkonto2 is null """ df = pd.read_sql(query, engine) with engine.begin() as conn: if not df.empty: conn.execute("CREATE TEMP TABLE temp_fin_porfel (schet text null, subkonto2 text null, summa_dogovora text null, percent_value text null)") df.to_sql('temp_fin_porfel', con=conn, if_exists='append', index=False, method='multi') return pd.read_sql("select * from temp_fin_porfel", con=conn).to_dict(orient='records') conn.execute (""" INSERT INTO public.fin_porfel SELECT distinct schet , subkonto2 , summa_dogovora , percent_value FROM temp_fin_porfel """) return 'Список обновлен.' else: return 'Обновлять нечего.' with DAG( dag_id='update_fin_portfel', default_args=default_args, description='Обновление списка Финансового портфеля', schedule_interval=None, catchup=False, tags=['sigma'], ) as dag: upsert_list_fin_portfel_task = PythonOperator( task_id='upsert_list_fin_portfel', python_callable=upsert_list_fin_portfel, provide_context=True ) upsert_list_fin_portfel_task