From 893e36809ed3f082981fae14ae288e94f6e038b1 Mon Sep 17 00:00:00 2001 From: bn_user Date: Thu, 13 Nov 2025 15:57:23 +0000 Subject: [PATCH] =?UTF-8?q?=D0=A3=D0=B4=D0=B0=D0=BB=D0=B8=D1=82=D1=8C=20da?= =?UTF-8?q?gs/update=5Fdicts.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags/update_dicts.py | 258 ------------------------------------------- 1 file changed, 258 deletions(-) delete mode 100644 dags/update_dicts.py diff --git a/dags/update_dicts.py b/dags/update_dicts.py deleted file mode 100644 index 4c6cfaf..0000000 --- a/dags/update_dicts.py +++ /dev/null @@ -1,258 +0,0 @@ -import requests -import json -import pandas as pd -import numpy as np -from datetime import datetime -from requests.auth import HTTPBasicAuth -from sqlalchemy import create_engine -from airflow import DAG -from airflow.operators.python import PythonOperator - -default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'start_date': datetime(2023, 1, 1), - 'retries': 1, -} - -def get_db_engine(): - """Создает подключение к PostgreSQL""" - DF_CONFIG = { - 'dbname': "postgres", - 'user': "postgres", - 'password': "4a00d4b90cd830da0796", - 'host': "postgresql", - 'port': "5432" - } - return create_engine( - f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" - f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", - pool_size=10, - max_overflow=20 - ) - -def take_dogovor(**kwargs): - params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} - query = """ВЫБРАТЬ РАЗЛИЧНЫЕ - ОстаткиОбороты.Субконто2.Наименование КАК Наименование, - ОстаткиОбороты.Субконто2.Номер КАК Номер, - ОстаткиОбороты.Субконто2.Дата КАК Дата, - ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействия, - UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора -ИЗ - РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты -ГДЕ - ОстаткиОбороты.Счет.Код В (&СписокСчетов) - И ОстаткиОбороты.Субконто2 ЕСТЬ НЕ NULL - """ - auth = HTTPBasicAuth('obmen', 'bOR2W7w4') - response = requests.post( - # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest - - url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', - json={"query":query, "params": params}, - auth=auth, - verify=False - ) - - data_from_1c = response.json() - df = pd.DataFrame(data_from_1c['data']) - engine = get_db_engine() - table_name = 'dict_dogovor' - field_mapping = { - 'Наименование': 'name', - 'Номер': 'nomer', - 'Дата': 'date_begin', - 'СрокДействия': 'date_end', - 'ИдентификаторДоговора': 'uid_dogovor' - } - df = df.rename(columns=field_mapping) - with engine.begin() as conn: - if not df.empty: - conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") - df.to_sql( - f'temp_{table_name}', - con=conn, - if_exists='append', - index=False, - method='multi' - ) - conn.execute(f"DELETE FROM public.{table_name}") - conn.execute(f""" - INSERT INTO public.{table_name} (name, nomer, date_begin, date_end, uid_dogovor) - SELECT - name, - nomer, - date_begin, - date_end, - uid_dogovor - FROM temp_{table_name} - ON CONFLICT (uid_dogovor) - DO UPDATE SET - name = EXCLUDED.name, - nomer = EXCLUDED.nomer, - date_begin = EXCLUDED.date_begin, - date_end = EXCLUDED.date_end - WHERE - public.{table_name}.name IS DISTINCT FROM EXCLUDED.name - OR public.{table_name}.nomer IS DISTINCT FROM EXCLUDED.nomer - OR public.{table_name}.date_begin IS DISTINCT FROM EXCLUDED.date_begin - OR public.{table_name}.date_end IS DISTINCT FROM EXCLUDED.date_end; - """ - ) - -def take_bank(**kwargs): - params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} - query = """ВЫБРАТЬ РАЗЛИЧНЫЕ - ОстаткиОбороты.Субконто1.Наименование КАК Наименование, - ОстаткиОбороты.Субконто1.ИНН КАК ИНН, - ОстаткиОбороты.Субконто1.КПП КАК КПП, - UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента -ИЗ - РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты -ГДЕ - ОстаткиОбороты.Счет.Код В (&СписокСчетов) - И ОстаткиОбороты.Субконто1 ЕСТЬ НЕ NULL - """ - auth = HTTPBasicAuth('obmen', 'bOR2W7w4') - response = requests.post( - # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest - - url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', - json={"query":query, "params": params}, - auth=auth, - verify=False - ) - - data_from_1c = response.json() - df = pd.DataFrame(data_from_1c['data']) - engine = get_db_engine() - table_name = 'dict_bank' - field_mapping = { - 'Наименование': 'name', - 'ИНН': 'inn', - 'КПП': 'kpp', - 'ИдентификаторКонтрагента': 'uid_bank' - } - df = df.rename(columns=field_mapping) - with engine.begin() as conn: - if not df.empty: - conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") - df.to_sql( - f'temp_{table_name}', - con=conn, - if_exists='append', - index=False, - method='multi' - ) - conn.execute(f"DELETE FROM public.{table_name}") - conn.execute(f""" - INSERT INTO public.{table_name} (name, inn, kpp, uid_bank) - SELECT - name, - inn, - kpp, - uid_bank - FROM temp_{table_name} - ON CONFLICT (uid_bank) - DO UPDATE SET - name = EXCLUDED.name, - inn = EXCLUDED.inn, - kpp = EXCLUDED.kpp - WHERE - public.{table_name}.name IS DISTINCT FROM EXCLUDED.name - OR public.{table_name}.inn IS DISTINCT FROM EXCLUDED.inn - OR public.{table_name}.kpp IS DISTINCT FROM EXCLUDED.kpp - """ - ) - -def take_organizaciya(**kwargs): - params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} - query = """ВЫБРАТЬ РАЗЛИЧНЫЕ - ОстаткиОбороты.Организация.Наименование КАК Наименование, - ОстаткиОбороты.Организация.ИНН КАК ИНН, - ОстаткиОбороты.Организация.КПП КАК КПП, - UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторОрганизации -ИЗ - РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты -ГДЕ - ОстаткиОбороты.Счет.Код В (&СписокСчетов) - """ - auth = HTTPBasicAuth('obmen', 'bOR2W7w4') - response = requests.post( - # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest - - url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', - json={"query":query, "params": params}, - auth=auth, - verify=False - ) - - data_from_1c = response.json() - df = pd.DataFrame(data_from_1c['data']) - engine = get_db_engine() - table_name = 'dict_organizaciya' - field_mapping = { - 'Наименование': 'name', - 'ИНН': 'inn', - 'КПП': 'kpp', - 'ИдентификаторОрганизации': 'uid_organizaciya' - } - df = df.rename(columns=field_mapping) - with engine.begin() as conn: - if not df.empty: - conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") - df.to_sql( - f'temp_{table_name}', - con=conn, - if_exists='append', - index=False, - method='multi' - ) - conn.execute(f"DELETE FROM public.{table_name}") - conn.execute(f""" - INSERT INTO public.{table_name} (name, inn, kpp, uid_organizaciya) - SELECT - name, - inn, - kpp, - uid_organizaciya - FROM temp_{table_name} - ON CONFLICT (uid_organizaciya) - DO UPDATE SET - name = EXCLUDED.name, - inn = EXCLUDED.inn, - kpp = EXCLUDED.kpp - WHERE - public.{table_name}.name IS DISTINCT FROM EXCLUDED.name - OR public.{table_name}.inn IS DISTINCT FROM EXCLUDED.inn - OR public.{table_name}.kpp IS DISTINCT FROM EXCLUDED.kpp - """ - ) - -with DAG( - dag_id='download_dicts_from_1C_source', - default_args=default_args, - description='Выгрузка данных из 1С', - schedule_interval=None, #"0,30 01-10 * * *", - catchup=False, - tags=['sigma'], -) as dag: - - take_dogovor_task = PythonOperator( - task_id="take_dogovor", - python_callable=take_dogovor, - provide_context=True - ) - take_bank_task = PythonOperator( - task_id="take_bank", - python_callable=take_bank, - provide_context=True - ) - take_organizaciya_task = PythonOperator( - task_id="take_organizaciya", - python_callable=take_organizaciya, - provide_context=True - ) - -[take_dogovor, take_bank_task, take_organizaciya_task]