sigma/dags/fin_porfel.py

103 lines
3.3 KiB
Python

import re
import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
def upsert_list_fin_portfel(**kwargs):
engine = get_db_engine()
query = """
select distinct
osv.uid_subkonto2 uid_dogovor
, osv.schet as schet
, osv.subkonto2 as name
, null as summa_dogovora
, null as percent_value
from public.oborotno_salbdovaya_vedomostb osv
left join public.fin_porfel fp
on fp.schet = osv.schet
and fp.uid_dogovor = osv.uid_subkonto2
where (osv.schet like '%%01%%' or osv.schet like '%%03%%')
and fp.uid_dogovor is null
"""
df = pd.read_sql(query, engine)
with engine.begin() as conn:
if not df.empty:
conn.execute("CREATE TEMP TABLE temp_fin_porfel (uid_dogovor text null, schet text null, name text null, summa_dogovora text null, percent_value text null)")
df.to_sql('temp_fin_porfel', con=conn, if_exists='append', index=False, method='multi')
conn.execute("""
INSERT INTO public.fin_porfel (uid_dogovor, schet, name, summa_dogovora, percent_value)
SELECT DISTINCT
uid_dogovor
, schet
, name
, summa_dogovora
, percent_value
FROM temp_fin_porfel
ON CONFLICT (uid_dogovor, schet)
DO UPDATE SET
name = EXCLUDED.name,
summa_dogovora = EXCLUDED.summa_dogovora,
percent_value = EXCLUDED.percent_value
"""
)
conn.execute("""
UPDATE public.fin_porfel fp
SET id = subquery.new_id
FROM (
SELECT
id,
ROW_NUMBER() OVER (ORDER BY uid_dogovor, schet, summa_dogovora) as new_id
FROM public.fin_porfel
) AS subquery
WHERE fp.id = subquery.id;
"""
)
return 'Список обновлен.'
else:
return 'Обновлять нечего.'
with DAG(
dag_id='update_fin_portfel',
default_args=default_args,
description='Обновление списка Финансового портфеля',
schedule_interval=None,
catchup=False,
tags=['sigma'],
) as dag:
upsert_list_fin_portfel_task = PythonOperator(
task_id='upsert_list_fin_portfel',
python_callable=upsert_list_fin_portfel,
provide_context=True
)
upsert_list_fin_portfel_task