import requests import json import pandas as pd import numpy as np from datetime import datetime from requests.auth import HTTPBasicAuth from sqlalchemy import create_engine from airflow import DAG from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, } def get_db_engine(): """Создает подключение к PostgreSQL""" DF_CONFIG = { 'dbname': "postgres", 'user': "postgres", 'password': "4a00d4b90cd830da0796", 'host': "postgresql", 'port': "5432" } return create_engine( f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", pool_size=10, max_overflow=20 ) def read_data_1C(**kwargs): params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} query = """ВЫБРАТЬ ОстаткиОбороты.Счет, ОстаткиОбороты.Субконто1, ОстаткиОбороты.Субконто2, ОстаткиОбороты.Организация, ОстаткиОбороты.СуммаОборот, ОстаткиОбороты.СуммаОборотДт, ОстаткиОбороты.СуммаОборотКт, ОстаткиОбороты.СуммаКонечныйОстаток, ОстаткиОбороты.СуммаКонечныйОстатокДт, ОстаткиОбороты.СуммаКонечныйОстатокКт, ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт, ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт, // Добавляем контрагента из договора ДоговорыКонтрагентов.Владелец КАК Контрагент, ДоговорыКонтрагентов.Номер КАК НомерДоговора, ДоговорыКонтрагентов.Дата КАК ДатаДоговора, ДоговорыКонтрагентов.СрокДействия КАК СрокДействияДоговора ИЗ РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты ЛЕВОЕ СОЕДИНЕНИЕ Справочник.ДоговорыКонтрагентов КАК ДоговорыКонтрагентов ПО ОстаткиОбороты.Субконто2 = ДоговорыКонтрагентов.Ссылка ГДЕ ОстаткиОбороты.Счет.Код В (&СписокСчетов) """ auth = HTTPBasicAuth('obmen', 'bOR2W7w4') response = requests.post( # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest # url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', json={"query":query, "params": params}, auth=auth, verify=False ) data_from_1c = response.json() df = pd.DataFrame(data_from_1c['data']) print(df) # engine = get_db_engine() # table_name = 'oborotno_salbdovaya_vedomostb' # field_mapping = { # # Основные поля # 'Счет': 'schet', # 'Субконто1': 'subkonto1', # 'Субконто2': 'subkonto2', # # 'Субконто3': 'subkonto3', # 'Организация': 'organizaciya', # # 'Валюта': 'valyuta', # # Суммовые остатки и обороты # # 'СуммаНачальныйОстаток': 'summa_nachalnyy_ostatok', # # 'СуммаНачальныйОстатокДт': 'summa_nachalnyy_ostatok_dt', # # 'СуммаНачальныйОстатокКт': 'summa_nachalnyy_ostatok_kt', # # 'СуммаНачальныйРазвернутыйОстатокДт': 'summa_nachalnyy_razvernutyy_ostatok_dt', # # 'СуммаНачальныйРазвернутыйОстатокКт': 'summa_nachalnyy_razvernutyy_ostatok_kt', # 'СуммаОборот': 'summa_oborot', # 'СуммаОборотДт': 'summa_oborot_dt', # 'СуммаОборотКт': 'summa_oborot_kt', # 'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok', # 'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt', # 'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt', # 'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt', # 'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt' # # Валютные остатки и обороты # # 'ВалютнаяСуммаНачальныйОстаток': 'valyutnaya_summa_nachalnyy_ostatok', # # 'ВалютнаяСуммаНачальныйОстатокДт': 'valyutnaya_summa_nachalnyy_ostatok_dt', # # 'ВалютнаяСуммаНачальныйОстатокКт': 'valyutnaya_summa_nachalnyy_ostatok_kt', # # 'ВалютнаяСуммаНачальныйРазвернутыйОстатокДт': 'valyutnaya_summa_nachalnyy_razvernutyy_ostatok_dt', # # 'ВалютнаяСуммаНачальныйРазвернутыйОстатокКт': 'valyutnaya_summa_nachalnyy_razvernutyy_ostatok_kt', # # 'ВалютнаяСуммаОборот': 'valyutnaya_summa_oborot', # # 'ВалютнаяСуммаОборотДт': 'valyutnaya_summa_oborot_dt', # # 'ВалютнаяСуммаОборотКт': 'valyutnaya_summa_oborot_kt', # # 'ВалютнаяСуммаКонечныйОстаток': 'valyutnaya_summa_konechnyy_ostatok', # # 'ВалютнаяСуммаКонечныйОстатокДт': 'valyutnaya_summa_konechnyy_ostatok_dt', # # 'ВалютнаяСуммаКонечныйОстатокКт': 'valyutnaya_summa_konechnyy_ostatok_kt', # # 'ВалютнаяСуммаКонечныйРазвернутыйОстатокДт': 'valyutnaya_summa_konechnyy_razvernutyy_ostatok_dt', # # 'ВалютнаяСуммаКонечныйРазвернутыйОстатокКт': 'valyutnaya_summa_konechnyy_razvernutyy_ostatok_kt', # # Количественные остатки и обороты # # 'КоличествоНачальныйОстаток': 'kolichestvo_nachalnyy_ostatok', # # 'КоличествоНачальныйОстатокДт': 'kolichestvo_nachalnyy_ostatok_dt', # # 'КоличествоНачальныйОстатокКт': 'kolichestvo_nachalnyy_ostatok_kt', # # 'КоличествоНачальныйРазвернутыйОстатокДт': 'kolichestvo_nachalnyy_razvernutyy_ostatok_dt', # # 'КоличествоНачальныйРазвернутыйОстатокКт': 'kolichestvo_nachalnyy_razvernutyy_ostatok_kt', # # 'КоличествоОборот': 'kolichestvo_oborot', # # 'КоличествоОборотДт': 'kolichestvo_oborot_dt', # # 'КоличествоОборотКт': 'kolichestvo_oborot_kt', # # 'КоличествоКонечныйОстаток': 'kolichestvo_konechnyy_ostatok', # # 'КоличествоКонечныйОстатокДт': 'kolichestvo_konechnyy_ostatok_dt', # # 'КоличествоКонечныйОстатокКт': 'kolichestvo_konechnyy_ostatok_kt', # # 'КоличествоКонечныйРазвернутыйОстатокДт': 'kolichestvo_konechnyy_razvernutyy_ostatok_dt', # # 'КоличествоКонечныйРазвернутыйОстатокКт': 'kolichestvo_konechnyy_razvernutyy_ostatok_kt' # } # df = df.rename(columns=field_mapping) # with engine.begin() as conn: # if not df.empty: # conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") # df.to_sql( # f'temp_{table_name}', # con=conn, # if_exists='append', # index=False, # method='multi' # ) # conn.execute(f"DELETE FROM public.{table_name} where get_date = CURRENT_DATE::date::timestamp") # conn.execute(f""" # INSERT INTO public.{table_name} # SELECT # schet # , subkonto1 # , subkonto2 # , organizaciya # , summa_oborot # , summa_oborot_dt # , summa_oborot_kt # , summa_konechnyy_ostatok # , summa_konechnyy_ostatok_dt # , summa_konechnyy_ostatok_kt # , summa_konechnyy_razvernutyy_ostatok_dt # , summa_konechnyy_razvernutyy_ostatok_kt # , CURRENT_DATE::date::timestamp # FROM temp_{table_name} # --ON CONFLICT (schet, subkonto1, subkonto2, organizaciya) # """ # ) with DAG( dag_id='data_download_from_1C_source', default_args=default_args, description='Выгрузка данных из 1С', schedule_interval=None, #"0,30 01-10 * * *", catchup=False, tags=['sigma'], ) as dag: read_data_1C_task = PythonOperator( task_id="read_data_1C", python_callable=read_data_1C, provide_context=True ) read_data_1C_task