import requests import json import pandas as pd import numpy as np from datetime import datetime, timedelta from requests.auth import HTTPBasicAuth from sqlalchemy import create_engine from airflow import DAG from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, } def get_db_engine(): """Создает подключение к PostgreSQL""" DF_CONFIG = { 'dbname': "postgres", 'user': "postgres", 'password': "4a00d4b90cd830da0796", 'host': "postgresql", 'port': "5432" } return create_engine( f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", pool_size=10, max_overflow=20 ) def read_data_1C(**kwargs): """ Создает DataFrame с ежедневным календарем и останавливается на дате T-2 Параметры: start_year (int): начальный год (по умолчанию 2019) end_year (int): конечный год (по умолчанию 2025) Возвращает: pandas.DataFrame: DataFrame с колонками Год, Месяц, День """ start_year = 2019 # Вычисляем дату T-2 (текущая дата минус 2 дня) t_minus_2 = datetime.now() - timedelta(days=2) t_minus_2 = t_minus_2.replace(hour=0, minute=0, second=0, microsecond=0) print(f"Текущая дата: {datetime.now().strftime('%Y-%m-%d')}") print(f"Дата T-2: {t_minus_2.strftime('%Y-%m-%d')}") # Создаем диапазон дат от начала до T-2 start_date = f'{start_year}-01-01' date_range = pd.date_range(start=start_date, end=t_minus_2, freq='D') # Создаем пустой список для хранения данных calendar_data = [] params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} # Проходим циклом по каждому дню print("\nПрохождение по дням календаря:") for i, date in enumerate(date_range): query = f"""ВЫБРАТЬ ОстаткиОбороты.Счет, ОстаткиОбороты.Субконто1, ОстаткиОбороты.Субконто2, ОстаткиОбороты.Организация, ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора, ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора, ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора, ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента, ОстаткиОбороты.Организация.Инн КАК ИннКлиента, ОстаткиОбороты.СуммаОборот, ОстаткиОбороты.СуммаОборотДт, ОстаткиОбороты.СуммаОборотКт, ОстаткиОбороты.СуммаКонечныйОстаток, ОстаткиОбороты.СуммаКонечныйОстатокДт, ОстаткиОбороты.СуммаКонечныйОстатокКт, ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт, ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт, UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора, UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента, UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента, ОстаткиОбороты.Период КАК ДатаОтчета ИЗ РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( ДАТАВРЕМЯ({date.year},{date.month},{date.day}), ДАТАВРЕМЯ({date.year},{date.month},{date.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты """ auth = HTTPBasicAuth('obmen', 'bOR2W7w4') response = requests.post( # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', json={"query":query, "params": params}, auth=auth, verify=False ) data_from_1c = response.json() df = pd.DataFrame(data_from_1c['data']) engine = get_db_engine() table_name = 'oborotno_salbdovaya_vedomostb' temp_table_name = f'temp_{table_name}' field_mapping = { # Основные поля 'Счет': 'schet', 'Субконто1': 'subkonto1', 'Субконто2': 'subkonto2', 'Организация': 'organizaciya', 'НомерДоговора': 'nomer', 'ДатаДоговора': 'date_begin', 'СрокДействияДоговора': 'date_end', 'ИннКонтрагента': 'inn_subkonto1', 'ИннКлиента': 'inn_organizaciya', # Суммовые остатки и обороты 'СуммаОборот': 'summa_oborot', 'СуммаОборотДт': 'summa_oborot_dt', 'СуммаОборотКт': 'summa_oborot_kt', 'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok', 'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt', 'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt', 'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt', 'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt', #ID 'ИдентификаторДоговора': 'uid_subkonto2', 'ИдентификаторКонтрагента': 'uid_subkonto1', 'ИдентификаторКлиента': 'uid_organizaciya', #DATE 'ДатаОтчета': 'get_date' } df = df.rename(columns=field_mapping) # Удаляем дубликаты по ключевым полям перед вставкой conflict_columns = ['schet', 'uid_subkonto2', 'uid_subkonto1', 'uid_organizaciya', 'get_date'] print(f" Данные перед удалением дубликатов (первые 10 строк):") print(f" Всего строк: {len(df)}") print(f" Ключевые колонки для проверки дубликатов: {conflict_columns}") # Выводим первые 10 строк с ключевыми полями if not df.empty: display_columns = conflict_columns + ['summa_oborot', 'nomer'] # Добавляем еще пару полей для информации available_columns = [col for col in display_columns if col in df.columns] print(f" Первые 10 строк (только ключевые поля):") print(df[available_columns].head(10).to_string(index=False)) # Проверяем наличие дубликатов duplicates = df.duplicated(subset=conflict_columns, keep=False) if duplicates.any(): duplicate_count = duplicates.sum() print(f" Найдено дубликатов: {duplicate_count}") print(f" Пример дублирующихся строк:") duplicate_samples = df[duplicates][available_columns].head(5) print(duplicate_samples.to_string(index=False)) else: print(f" Дубликатов не найдено") with engine.begin() as conn: if not df.empty: conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}") conn.execute(f"CREATE TEMP TABLE {temp_table_name} (LIKE public.{table_name})") df.to_sql( f'temp_{table_name}', con=conn, if_exists='append', index=False, method='multi' ) # conn.execute(f"DELETE FROM public.{table_name} where get_date = (SELECT DISTINCT get_date FROM temp_{table_name})") conn.execute(f""" INSERT INTO public.{table_name} SELECT schet , subkonto1 , subkonto2 , organizaciya , case when nomer = '' then null else nomer end nomer , case when TO_DATE(date_begin, 'DD.MM.YYYY') = '0001-01-01' then null else date_begin end date_begin , case when TO_DATE(date_end, 'DD.MM.YYYY') = '0001-01-01' then null else date_end end date_end , inn_subkonto1 as inn_subkonto1 , inn_organizaciya as inn_organizaciya , summa_oborot , summa_oborot_dt , summa_oborot_kt , summa_konechnyy_ostatok , summa_konechnyy_ostatok_dt , summa_konechnyy_ostatok_kt , summa_konechnyy_razvernutyy_ostatok_dt , summa_konechnyy_razvernutyy_ostatok_kt , uid_subkonto2 , uid_subkonto1 , uid_organizaciya , CURRENT_DATE::date::timestamp FROM temp_{table_name} ON CONFLICT (schet, uid_subkonto2, uid_subkonto1, uid_organizaciya, get_date) DO UPDATE SET subkonto1 = EXCLUDED.subkonto1, subkonto2 = EXCLUDED.subkonto2, organizaciya = EXCLUDED.organizaciya, nomer = EXCLUDED.nomer, date_begin = EXCLUDED.date_begin, date_end = EXCLUDED.date_end, inn_subkonto1 = EXCLUDED.inn_subkonto1, inn_organizaciya = EXCLUDED.inn_organizaciya, summa_oborot = EXCLUDED.summa_oborot, summa_oborot_dt = EXCLUDED.summa_oborot_dt, summa_oborot_kt = EXCLUDED.summa_oborot_kt, summa_konechnyy_ostatok = EXCLUDED.summa_konechnyy_ostatok, summa_konechnyy_ostatok_dt = EXCLUDED.summa_konechnyy_ostatok_dt, summa_konechnyy_ostatok_kt = EXCLUDED.summa_konechnyy_ostatok_kt, summa_konechnyy_razvernutyy_ostatok_dt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_dt, summa_konechnyy_razvernutyy_ostatok_kt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_kt """ ) with DAG( dag_id='data_download_from_1C_source', default_args=default_args, description='Выгрузка данных из 1С', schedule_interval=None, #"0,30 01-10 * * *", catchup=False, tags=['sigma'], ) as dag: read_data_1C_task = PythonOperator( task_id="read_data_1C", python_callable=read_data_1C, provide_context=True ) read_data_1C_task