import requests import json import pandas as pd import numpy as np from datetime import datetime from requests.auth import HTTPBasicAuth from sqlalchemy import create_engine from airflow import DAG from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, } def get_db_engine(): """Создает подключение к PostgreSQL""" DF_CONFIG = { 'dbname': "postgres", 'user': "postgres", 'password': "4a00d4b90cd830da0796", 'host': "postgresql", 'port': "5432" } return create_engine( f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", pool_size=10, max_overflow=20 ) def take_dogovor(**kwargs): params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} query = """ВЫБРАТЬ РАЗЛИЧНЫЕ ОстаткиОбороты.Субконто2.Наименование КАК Наименование, ОстаткиОбороты.Субконто2.Номер КАК Номер, ОстаткиОбороты.Субконто2.Дата КАК Дата, ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействия, UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора ИЗ РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты ГДЕ ОстаткиОбороты.Счет.Код В (&СписокСчетов) И ОстаткиОбороты.Субконто2 ЕСТЬ НЕ NULL """ auth = HTTPBasicAuth('obmen', 'bOR2W7w4') response = requests.post( # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', json={"query":query, "params": params}, auth=auth, verify=False ) data_from_1c = response.json() df = pd.DataFrame(data_from_1c['data']) engine = get_db_engine() table_name = 'dict_dogovor' field_mapping = { 'Наименование': 'name', 'Номер': 'nomer', 'Дата': 'date_begin', 'СрокДействия': 'date_end', 'ИдентификаторДоговора': 'uid_dogovor' } df = df.rename(columns=field_mapping) with engine.begin() as conn: if not df.empty: conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") df.to_sql( f'temp_{table_name}', con=conn, if_exists='append', index=False, method='multi' ) # conn.execute(f"DELETE FROM public.{table_name}") conn.execute(f""" INSERT INTO public.{table_name} (id, name, nomer, date_begin, date_end, uid_dogovor) SELECT row_number() over (order by uid_dogovor) id, name, nomer, date_begin, date_end, uid_dogovor FROM temp_{table_name} ON CONFLICT (uid_dogovor) DO UPDATE SET id = EXCLUDED.id, name = EXCLUDED.name, nomer = EXCLUDED.nomer, date_begin = EXCLUDED.date_begin, date_end = EXCLUDED.date_end; """ ) def take_bank(**kwargs): params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} query = """ВЫБРАТЬ РАЗЛИЧНЫЕ ОстаткиОбороты.Субконто1.Наименование КАК Наименование, ОстаткиОбороты.Субконто1.ИНН КАК ИНН, ОстаткиОбороты.Субконто1.КПП КАК КПП, UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента ИЗ РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты ГДЕ ОстаткиОбороты.Счет.Код В (&СписокСчетов) И ОстаткиОбороты.Субконто1 ЕСТЬ НЕ NULL """ auth = HTTPBasicAuth('obmen', 'bOR2W7w4') response = requests.post( # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', json={"query":query, "params": params}, auth=auth, verify=False ) data_from_1c = response.json() df = pd.DataFrame(data_from_1c['data']) engine = get_db_engine() table_name = 'dict_bank' field_mapping = { 'Наименование': 'name', 'ИНН': 'inn', 'КПП': 'kpp', 'ИдентификаторКонтрагента': 'uid_bank' } df = df.rename(columns=field_mapping) with engine.begin() as conn: if not df.empty: conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") df.to_sql( f'temp_{table_name}', con=conn, if_exists='append', index=False, method='multi' ) # conn.execute(f"DELETE FROM public.{table_name}") conn.execute(f""" INSERT INTO public.{table_name} (id, name, inn, kpp, uid_bank) SELECT row_number() over (order by uid_bank) id, name, inn, kpp, uid_bank FROM temp_{table_name} ON CONFLICT (uid_bank) DO UPDATE SET id = EXCLUDED.id, name = EXCLUDED.name, inn = EXCLUDED.inn, kpp = EXCLUDED.kpp; """ ) def take_organizaciya(**kwargs): params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} query = """ВЫБРАТЬ РАЗЛИЧНЫЕ ОстаткиОбороты.Организация.Наименование КАК Наименование, ОстаткиОбороты.Организация.ИНН КАК ИНН, ОстаткиОбороты.Организация.КПП КАК КПП, UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторОрганизации ИЗ РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты ГДЕ ОстаткиОбороты.Счет.Код В (&СписокСчетов) """ auth = HTTPBasicAuth('obmen', 'bOR2W7w4') response = requests.post( # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', json={"query":query, "params": params}, auth=auth, verify=False ) data_from_1c = response.json() df = pd.DataFrame(data_from_1c['data']) engine = get_db_engine() table_name = 'dict_organizaciya' field_mapping = { 'Наименование': 'name', 'ИНН': 'inn', 'КПП': 'kpp', 'ИдентификаторОрганизации': 'uid_organizaciya' } df = df.rename(columns=field_mapping) with engine.begin() as conn: if not df.empty: conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") df.to_sql( f'temp_{table_name}', con=conn, if_exists='append', index=False, method='multi' ) # conn.execute(f"DELETE FROM public.{table_name}") conn.execute(f""" INSERT INTO public.{table_name} (id, name, inn, kpp, uid_organizaciya) SELECT row_number() over (order by uid_organizaciya) id, name, inn, kpp, uid_organizaciya FROM temp_{table_name} ON CONFLICT (uid_organizaciya) DO UPDATE SET id = EXCLUDED.id, name = EXCLUDED.name, inn = EXCLUDED.inn, kpp = EXCLUDED.kpp; """ ) with DAG( dag_id='download_dicts_from_1C_source', default_args=default_args, description='Выгрузка данных из 1С', schedule_interval=None, #"0,30 01-10 * * *", catchup=False, tags=['sigma'], ) as dag: take_dogovor_task = PythonOperator( task_id="take_dogovor", python_callable=take_dogovor, provide_context=True ) take_bank_task = PythonOperator( task_id="take_bank", python_callable=take_bank, provide_context=True ) take_organizaciya_task = PythonOperator( task_id="take_organizaciya", python_callable=take_organizaciya, provide_context=True ) [take_dogovor, take_bank_task, take_organizaciya_task]