sigma/dags/fin_porfel.py

83 lines
2.4 KiB
Python

import re
import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
def upsert_list_fin_portfel(**kwargs):
engine = get_db_engine()
query =
"""
select distinct
osv.schet as schet
, osv.subkonto2 as subkonto2
, null as summa_dogovora
, null as percent_value
from public.oborotno_salbdovaya_vedomostb osv
where osv.subkonto2 not in (select dst2.subkonto2 from public.dict_subkonto_sec dst2)
and (osv.schet like '%%01%%' or osv.schet like '%%03%%')
"""
df = pd.read_sql(query, engine)
with engine.begin() as conn:
if not df.empty:
conn.execute("CREATE TEMP TABLE temp_fin_portfel as SELECT * FROM public.fin_portfel WHERE 1=0")
df.to_sql('temp_fin_portfel', con=conn, if_exists='append', index=False, method='multi')
conn.execute
("""
INSERT INTO public.fin_portfel
SELECT
schet
, subkonto2
, summa_dogovora
, percent_value
FROM temp_fin_portfel
""")
return 'Список обновлен.'
else:
return 'Обновлять нечего.'
with DAG(
dag_id='update_fin_portfel',
default_args=default_args,
description='Обновление списка Финансового портфеля',
schedule_interval=None,
catchup=False,
tags=['sigma'],
) as dag:
upsert_list_fin_portfel_task = PythonOperator(
task_id='upsert_list_fin_portfel',
python_callable=upsert_list_fin_portfel,
provide_context=True
)
upsert_list_fin_portfel_task