From b6c516037caaec00f246806618004fefc249042a Mon Sep 17 00:00:00 2001 From: bn_user Date: Sun, 30 Nov 2025 17:46:51 +0000 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20dags/OSV=5Fwith=5Fdocs.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags/OSV_with_docs.py | 268 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100644 dags/OSV_with_docs.py diff --git a/dags/OSV_with_docs.py b/dags/OSV_with_docs.py new file mode 100644 index 0000000..31d8cd3 --- /dev/null +++ b/dags/OSV_with_docs.py @@ -0,0 +1,268 @@ +import requests +import json +import pandas as pd +import numpy as np +from datetime import datetime, timedelta +from requests.auth import HTTPBasicAuth +from sqlalchemy import create_engine +from airflow import DAG +from airflow.operators.python import PythonOperator + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2023, 1, 1), + 'retries': 1, +} + +def get_db_engine(): + """Создает подключение к PostgreSQL""" + DF_CONFIG = { + 'dbname': "postgres", + 'user': "postgres", + 'password': "4a00d4b90cd830da0796", + 'host': "postgresql", + 'port': "5432" + } + return create_engine( + f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" + f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", + pool_size=10, + max_overflow=20 + ) + +def read_data_1C(**kwargs): + """ + Создает DataFrame с ежедневным календарем и останавливается на дате T-2 + + Параметры: + start_year (int): начальный год (по умолчанию 2019) + end_year (int): конечный год (по умолчанию 2025) + + Возвращает: + pandas.DataFrame: DataFrame с колонками Год, Месяц, День + """ + start_year = 2019 + # Вычисляем дату T-2 (текущая дата минус 2 дня) + t_minus_2 = datetime.now() - timedelta(days=2) + t_minus_2 = t_minus_2.replace(hour=0, minute=0, second=0, microsecond=0) + + print(f"Текущая дата: {datetime.now().strftime('%Y-%m-%d')}") + print(f"Дата T-2: {t_minus_2.strftime('%Y-%m-%d')}") + # Создаем диапазон дат от начала до T-2 + start_date = f'{start_year}-01-01' + + date_range = pd.date_range(start=start_date, end=t_minus_2, freq='D') + + # Создаем пустой список для хранения данных + calendar_data = [] + params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} + engine = get_db_engine() + table_name = 'oborotno_salbdovaya_vedomostb' + temp_table_name = f'temp_{table_name}' + # Проходим циклом по каждому дню + print("\nПрохождение по дням календаря:") + for i, date in enumerate(date_range): + print(f'{date}') + query = f"""ВЫБРАТЬ + ОстаткиОбороты.Счет, + ОстаткиОбороты.Субконто1, + ОстаткиОбороты.Субконто2, + ОстаткиОбороты.Организация, + ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора, + ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора, + ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора, + ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента, + ОстаткиОбороты.Организация.Инн КАК ИннКлиента, + ОстаткиОбороты.СуммаОборот, + ОстаткиОбороты.СуммаОборотДт, + ОстаткиОбороты.СуммаОборотКт, + ОстаткиОбороты.СуммаКонечныйОстаток, + ОстаткиОбороты.СуммаКонечныйОстатокДт, + ОстаткиОбороты.СуммаКонечныйОстатокКт, + ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт, + ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт, + UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора, + UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента, + UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента, + ОстаткиОбороты.Период КАК ДатаОтчета + ИЗ + РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( ДАТАВРЕМЯ({date.year},{date.month},{date.day}), ДАТАВРЕМЯ({date.year},{date.month},{date.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты + """ + + auth = HTTPBasicAuth('obmen', 'bOR2W7w4') + response = requests.post( + # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest + + url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', + json={"query":query, "params": params}, + auth=auth, + verify=False + ) + + data_from_1c = response.json() + df = pd.DataFrame(data_from_1c['data']) + + field_mapping = { + # Основные поля + 'Счет': 'schet', + 'Субконто1': 'subkonto1', + 'Субконто2': 'subkonto2', + 'Организация': 'organizaciya', + 'НомерДоговора': 'nomer', + 'ДатаДоговора': 'date_begin', + 'СрокДействияДоговора': 'date_end', + 'ИннКонтрагента': 'inn_subkonto1', + 'ИннКлиента': 'inn_organizaciya', + + # Суммовые остатки и обороты + 'СуммаОборот': 'summa_oborot', + 'СуммаОборотДт': 'summa_oborot_dt', + 'СуммаОборотКт': 'summa_oborot_kt', + 'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok', + 'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt', + 'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt', + 'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt', + 'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt', + + #ID + 'ИдентификаторДоговора': 'uid_subkonto2', + 'ИдентификаторКонтрагента': 'uid_subkonto1', + 'ИдентификаторКлиента': 'uid_organizaciya', + + #DATE + 'ДатаОтчета': 'get_date' + } + + df = df.rename(columns=field_mapping) + # Удаляем дубликаты по ключевым полям перед вставкой + group_columns = [ + 'schet', 'subkonto1', 'subkonto2', 'organizaciya', 'nomer', + 'date_begin', 'date_end', 'inn_subkonto1', 'inn_organizaciya', + 'uid_subkonto2', 'uid_subkonto1', 'uid_organizaciya', 'get_date' + ] + columns_for_sum = [ + 'summa_oborot', 'summa_oborot_dt', 'summa_oborot_kt', + 'summa_konechnyy_ostatok', 'summa_konechnyy_ostatok_dt', 'summa_konechnyy_ostatok_kt', + 'summa_konechnyy_razvernutyy_ostatok_dt', 'summa_konechnyy_razvernutyy_ostatok_kt' + ] + sum_columns = { + 'summa_oborot': 'sum', 'summa_oborot_dt': 'sum', 'summa_oborot_kt': 'sum', + 'summa_konechnyy_ostatok': 'sum', 'summa_konechnyy_ostatok_dt': 'sum', 'summa_konechnyy_ostatok_kt': 'sum', + 'summa_konechnyy_razvernutyy_ostatok_dt': 'sum', 'summa_konechnyy_razvernutyy_ostatok_kt': 'sum' + } + for col in columns_for_sum: + if col in df.columns: + # Очищаем и преобразуем числовые значения + df[col] = (df[col] + .str.replace('\xa0', '', regex=False) # Убираем неразрывные пробелы + .str.replace(' ', '', regex=False) # Убираем обычные пробелы + .str.replace(',', '.', regex=False) # Запятые -> точки для десятичных + .apply(lambda x: x if x.replace('.', '').replace('-', '').isdigit() else None) + .apply(pd.to_numeric, errors='coerce')) + + # Диагностика + original = df[col].copy() + cleaned = df[col] + failed = cleaned.isna() & original.notna() + + if failed.any(): + display(f" {col}: не преобразовано {failed.sum()} значений") + display(f" Примеры: {original[failed].head(3).tolist()}") + + df = df.groupby(group_columns, as_index=False).agg(sum_columns) + + # Преобразуем даты из формата DD.MM.YYYY HH:MM:SS в datetime + date_columns = ['date_begin', 'date_end', 'get_date'] + + for col in date_columns: + if col in df.columns: + # Пробуем разные форматы дат + df[col] = pd.to_datetime(df[col], format='%d.%m.%Y %H:%M:%S', errors='coerce') + + # Если не сработало, пробуем без времени + if df[col].isna().any(): + df[col] = pd.to_datetime(df[col], format='%d.%m.%Y', errors='coerce') + + # Убедимся, что все даты преобразованы + for col in date_columns: + if col in df.columns: + failed_count = df[col].isna().sum() + if failed_count > 0: + print(f" Внимание: в колонке '{col}' осталось {failed_count} некорректных дат") + + + with engine.begin() as conn: + if not df.empty: + conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}") + conn.execute(f"CREATE TEMP TABLE {temp_table_name} (LIKE public.{table_name})") + df.to_sql( + f'temp_{table_name}', + con=conn, + if_exists='append', + index=False, + method='multi' + ) + # conn.execute(f"DELETE FROM public.{table_name} where get_date = (SELECT DISTINCT get_date FROM temp_{table_name})") + conn.execute(f""" + INSERT INTO public.{table_name} + SELECT + schet + , subkonto1 + , subkonto2 + , organizaciya + , case when nomer = '' then null else nomer end nomer + , case when date_begin = '0001-01-01' then null else date_begin end date_begin + , case when date_begin = '0001-01-01' then null else date_end end date_end + , inn_subkonto1 as inn_subkonto1 + , inn_organizaciya as inn_organizaciya + , summa_oborot + , summa_oborot_dt + , summa_oborot_kt + , summa_konechnyy_ostatok + , summa_konechnyy_ostatok_dt + , summa_konechnyy_ostatok_kt + , summa_konechnyy_razvernutyy_ostatok_dt + , summa_konechnyy_razvernutyy_ostatok_kt + , uid_subkonto2 + , uid_subkonto1 + , uid_organizaciya + , get_date::date::timestamp + FROM temp_{table_name} + ON CONFLICT (schet, uid_subkonto2, uid_subkonto1, uid_organizaciya, get_date) + DO UPDATE SET + subkonto1 = EXCLUDED.subkonto1, + subkonto2 = EXCLUDED.subkonto2, + organizaciya = EXCLUDED.organizaciya, + nomer = EXCLUDED.nomer, + date_begin = EXCLUDED.date_begin, + date_end = EXCLUDED.date_end, + inn_subkonto1 = EXCLUDED.inn_subkonto1, + inn_organizaciya = EXCLUDED.inn_organizaciya, + summa_oborot = EXCLUDED.summa_oborot, + summa_oborot_dt = EXCLUDED.summa_oborot_dt, + summa_oborot_kt = EXCLUDED.summa_oborot_kt, + summa_konechnyy_ostatok = EXCLUDED.summa_konechnyy_ostatok, + summa_konechnyy_ostatok_dt = EXCLUDED.summa_konechnyy_ostatok_dt, + summa_konechnyy_ostatok_kt = EXCLUDED.summa_konechnyy_ostatok_kt, + summa_konechnyy_razvernutyy_ostatok_dt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_dt, + summa_konechnyy_razvernutyy_ostatok_kt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_kt + """ + ) + +with DAG( + dag_id='data_download_from_1C_source', + default_args=default_args, + description='Выгрузка данных из 1С', + schedule_interval=None, #"0,30 01-10 * * *", + catchup=False, + tags=['sigma'], +) as dag: + + read_data_1C_task = PythonOperator( + task_id="read_data_1C", + python_callable=read_data_1C, + provide_context=True + ) + +read_data_1C_task \ No newline at end of file