diff --git a/dags/update_dicts.py b/dags/update_dicts.py new file mode 100644 index 0000000..4c6cfaf --- /dev/null +++ b/dags/update_dicts.py @@ -0,0 +1,258 @@ +import requests +import json +import pandas as pd +import numpy as np +from datetime import datetime +from requests.auth import HTTPBasicAuth +from sqlalchemy import create_engine +from airflow import DAG +from airflow.operators.python import PythonOperator + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2023, 1, 1), + 'retries': 1, +} + +def get_db_engine(): + """Создает подключение к PostgreSQL""" + DF_CONFIG = { + 'dbname': "postgres", + 'user': "postgres", + 'password': "4a00d4b90cd830da0796", + 'host': "postgresql", + 'port': "5432" + } + return create_engine( + f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" + f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", + pool_size=10, + max_overflow=20 + ) + +def take_dogovor(**kwargs): + params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} + query = """ВЫБРАТЬ РАЗЛИЧНЫЕ + ОстаткиОбороты.Субконто2.Наименование КАК Наименование, + ОстаткиОбороты.Субконто2.Номер КАК Номер, + ОстаткиОбороты.Субконто2.Дата КАК Дата, + ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействия, + UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора +ИЗ + РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты +ГДЕ + ОстаткиОбороты.Счет.Код В (&СписокСчетов) + И ОстаткиОбороты.Субконто2 ЕСТЬ НЕ NULL + """ + auth = HTTPBasicAuth('obmen', 'bOR2W7w4') + response = requests.post( + # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest + + url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', + json={"query":query, "params": params}, + auth=auth, + verify=False + ) + + data_from_1c = response.json() + df = pd.DataFrame(data_from_1c['data']) + engine = get_db_engine() + table_name = 'dict_dogovor' + field_mapping = { + 'Наименование': 'name', + 'Номер': 'nomer', + 'Дата': 'date_begin', + 'СрокДействия': 'date_end', + 'ИдентификаторДоговора': 'uid_dogovor' + } + df = df.rename(columns=field_mapping) + with engine.begin() as conn: + if not df.empty: + conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") + df.to_sql( + f'temp_{table_name}', + con=conn, + if_exists='append', + index=False, + method='multi' + ) + conn.execute(f"DELETE FROM public.{table_name}") + conn.execute(f""" + INSERT INTO public.{table_name} (name, nomer, date_begin, date_end, uid_dogovor) + SELECT + name, + nomer, + date_begin, + date_end, + uid_dogovor + FROM temp_{table_name} + ON CONFLICT (uid_dogovor) + DO UPDATE SET + name = EXCLUDED.name, + nomer = EXCLUDED.nomer, + date_begin = EXCLUDED.date_begin, + date_end = EXCLUDED.date_end + WHERE + public.{table_name}.name IS DISTINCT FROM EXCLUDED.name + OR public.{table_name}.nomer IS DISTINCT FROM EXCLUDED.nomer + OR public.{table_name}.date_begin IS DISTINCT FROM EXCLUDED.date_begin + OR public.{table_name}.date_end IS DISTINCT FROM EXCLUDED.date_end; + """ + ) + +def take_bank(**kwargs): + params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} + query = """ВЫБРАТЬ РАЗЛИЧНЫЕ + ОстаткиОбороты.Субконто1.Наименование КАК Наименование, + ОстаткиОбороты.Субконто1.ИНН КАК ИНН, + ОстаткиОбороты.Субконто1.КПП КАК КПП, + UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента +ИЗ + РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты +ГДЕ + ОстаткиОбороты.Счет.Код В (&СписокСчетов) + И ОстаткиОбороты.Субконто1 ЕСТЬ НЕ NULL + """ + auth = HTTPBasicAuth('obmen', 'bOR2W7w4') + response = requests.post( + # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest + + url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', + json={"query":query, "params": params}, + auth=auth, + verify=False + ) + + data_from_1c = response.json() + df = pd.DataFrame(data_from_1c['data']) + engine = get_db_engine() + table_name = 'dict_bank' + field_mapping = { + 'Наименование': 'name', + 'ИНН': 'inn', + 'КПП': 'kpp', + 'ИдентификаторКонтрагента': 'uid_bank' + } + df = df.rename(columns=field_mapping) + with engine.begin() as conn: + if not df.empty: + conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") + df.to_sql( + f'temp_{table_name}', + con=conn, + if_exists='append', + index=False, + method='multi' + ) + conn.execute(f"DELETE FROM public.{table_name}") + conn.execute(f""" + INSERT INTO public.{table_name} (name, inn, kpp, uid_bank) + SELECT + name, + inn, + kpp, + uid_bank + FROM temp_{table_name} + ON CONFLICT (uid_bank) + DO UPDATE SET + name = EXCLUDED.name, + inn = EXCLUDED.inn, + kpp = EXCLUDED.kpp + WHERE + public.{table_name}.name IS DISTINCT FROM EXCLUDED.name + OR public.{table_name}.inn IS DISTINCT FROM EXCLUDED.inn + OR public.{table_name}.kpp IS DISTINCT FROM EXCLUDED.kpp + """ + ) + +def take_organizaciya(**kwargs): + params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} + query = """ВЫБРАТЬ РАЗЛИЧНЫЕ + ОстаткиОбороты.Организация.Наименование КАК Наименование, + ОстаткиОбороты.Организация.ИНН КАК ИНН, + ОстаткиОбороты.Организация.КПП КАК КПП, + UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторОрганизации +ИЗ + РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты +ГДЕ + ОстаткиОбороты.Счет.Код В (&СписокСчетов) + """ + auth = HTTPBasicAuth('obmen', 'bOR2W7w4') + response = requests.post( + # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest + + url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', + json={"query":query, "params": params}, + auth=auth, + verify=False + ) + + data_from_1c = response.json() + df = pd.DataFrame(data_from_1c['data']) + engine = get_db_engine() + table_name = 'dict_organizaciya' + field_mapping = { + 'Наименование': 'name', + 'ИНН': 'inn', + 'КПП': 'kpp', + 'ИдентификаторОрганизации': 'uid_organizaciya' + } + df = df.rename(columns=field_mapping) + with engine.begin() as conn: + if not df.empty: + conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") + df.to_sql( + f'temp_{table_name}', + con=conn, + if_exists='append', + index=False, + method='multi' + ) + conn.execute(f"DELETE FROM public.{table_name}") + conn.execute(f""" + INSERT INTO public.{table_name} (name, inn, kpp, uid_organizaciya) + SELECT + name, + inn, + kpp, + uid_organizaciya + FROM temp_{table_name} + ON CONFLICT (uid_organizaciya) + DO UPDATE SET + name = EXCLUDED.name, + inn = EXCLUDED.inn, + kpp = EXCLUDED.kpp + WHERE + public.{table_name}.name IS DISTINCT FROM EXCLUDED.name + OR public.{table_name}.inn IS DISTINCT FROM EXCLUDED.inn + OR public.{table_name}.kpp IS DISTINCT FROM EXCLUDED.kpp + """ + ) + +with DAG( + dag_id='download_dicts_from_1C_source', + default_args=default_args, + description='Выгрузка данных из 1С', + schedule_interval=None, #"0,30 01-10 * * *", + catchup=False, + tags=['sigma'], +) as dag: + + take_dogovor_task = PythonOperator( + task_id="take_dogovor", + python_callable=take_dogovor, + provide_context=True + ) + take_bank_task = PythonOperator( + task_id="take_bank", + python_callable=take_bank, + provide_context=True + ) + take_organizaciya_task = PythonOperator( + task_id="take_organizaciya", + python_callable=take_organizaciya, + provide_context=True + ) + +[take_dogovor, take_bank_task, take_organizaciya_task]