From cb802e7b5be757c8139df29bd649e4d44ef0d66a Mon Sep 17 00:00:00 2001 From: bn_user Date: Mon, 1 Dec 2025 05:58:39 +0000 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20dags/OSV=5Fwith=5Fdocs.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags/OSV_with_docs.py | 61 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/dags/OSV_with_docs.py b/dags/OSV_with_docs.py index 528cd79..3aa2287 100644 --- a/dags/OSV_with_docs.py +++ b/dags/OSV_with_docs.py @@ -238,6 +238,59 @@ def read_data_1C(**kwargs): """ ) +def upsert_list_fin_portfel(**kwargs): + engine = get_db_engine() + query = """ + select distinct + osv.uid_subkonto2 uid_dogovor + , osv.schet as schet + , osv.subkonto2 as name + , null as summa_dogovora + , null as percent_value + from public.oborotno_salbdovaya_vedomostb osv + left join public.fin_porfel fp + on fp.schet = osv.schet + and fp.uid_dogovor = osv.uid_subkonto2 + where (osv.schet like '%%01%%' or osv.schet like '%%03%%') + and fp.uid_dogovor is null + """ + df = pd.read_sql(query, engine) + with engine.begin() as conn: + if not df.empty: + conn.execute("CREATE TEMP TABLE temp_fin_porfel (uid_dogovor text null, schet text null, name text null, summa_dogovora text null, percent_value text null)") + df.to_sql('temp_fin_porfel', con=conn, if_exists='append', index=False, method='multi') + conn.execute(""" + INSERT INTO public.fin_porfel (uid_dogovor, schet, name, summa_dogovora, percent_value) + SELECT DISTINCT + uid_dogovor + , schet + , name + , summa_dogovora + , percent_value + FROM temp_fin_porfel + ON CONFLICT (uid_dogovor, schet) + DO UPDATE SET + name = EXCLUDED.name, + summa_dogovora = EXCLUDED.summa_dogovora, + percent_value = EXCLUDED.percent_value + """ + ) + conn.execute(""" + UPDATE public.fin_porfel fp + SET id = subquery.new_id + FROM ( + SELECT + id, + ROW_NUMBER() OVER (ORDER BY uid_dogovor, schet, summa_dogovora) as new_id + FROM public.fin_porfel + ) AS subquery + WHERE fp.id = subquery.id; + """ + ) + return 'Список обновлен.' + else: + return 'Обновлять нечего.' + with DAG( dag_id='data_download_from_1C_source_with_docs', default_args=default_args, @@ -253,4 +306,10 @@ with DAG( provide_context=True ) -read_data_1C_task \ No newline at end of file + upsert_list_fin_portfel_task = PythonOperator( + task_id='upsert_list_fin_portfel', + python_callable=upsert_list_fin_portfel, + provide_context=True + ) + +read_data_1C_task >> upsert_list_fin_portfel_task \ No newline at end of file