From b20a7484e5a3e52cee956ed159d3bcfc0e1e6560 Mon Sep 17 00:00:00 2001 From: bn_user Date: Sun, 30 Nov 2025 18:12:15 +0000 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20dags/OSV=5Fwith=5Fdocs.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags/OSV_with_docs.py | 354 ++++++++++++++++++++---------------------- 1 file changed, 171 insertions(+), 183 deletions(-) diff --git a/dags/OSV_with_docs.py b/dags/OSV_with_docs.py index 31d8cd3..dd200ef 100644 --- a/dags/OSV_with_docs.py +++ b/dags/OSV_with_docs.py @@ -52,203 +52,191 @@ def read_data_1C(**kwargs): # Создаем диапазон дат от начала до T-2 start_date = f'{start_year}-01-01' - date_range = pd.date_range(start=start_date, end=t_minus_2, freq='D') + # date_range = pd.date_range(start=start_date, end=t_minus_2, freq='D') # Создаем пустой список для хранения данных calendar_data = [] params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} engine = get_db_engine() - table_name = 'oborotno_salbdovaya_vedomostb' + table_name = 'oborotno_salbdovaya_vedomostb_with_docs' temp_table_name = f'temp_{table_name}' # Проходим циклом по каждому дню - print("\nПрохождение по дням календаря:") - for i, date in enumerate(date_range): - print(f'{date}') - query = f"""ВЫБРАТЬ - ОстаткиОбороты.Счет, - ОстаткиОбороты.Субконто1, - ОстаткиОбороты.Субконто2, - ОстаткиОбороты.Организация, - ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора, - ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора, - ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора, - ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента, - ОстаткиОбороты.Организация.Инн КАК ИннКлиента, - ОстаткиОбороты.СуммаОборот, - ОстаткиОбороты.СуммаОборотДт, - ОстаткиОбороты.СуммаОборотКт, - ОстаткиОбороты.СуммаКонечныйОстаток, - ОстаткиОбороты.СуммаКонечныйОстатокДт, - ОстаткиОбороты.СуммаКонечныйОстатокКт, - ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт, - ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт, - UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора, - UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента, - UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента, - ОстаткиОбороты.Период КАК ДатаОтчета - ИЗ - РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( ДАТАВРЕМЯ({date.year},{date.month},{date.day}), ДАТАВРЕМЯ({date.year},{date.month},{date.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты - """ + # print("\nПрохождение по дням календаря:") + # for i, date in enumerate(date_range): + # print(f'{date}') + query = f"""ВЫБРАТЬ + ОстаткиОбороты.Счет, + ОстаткиОбороты.Регистратор, + ОстаткиОбороты.Субконто1, + ОстаткиОбороты.Субконто2, + ОстаткиОбороты.Организация, + ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора, + ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора, + ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора, + ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента, + ОстаткиОбороты.Организация.Инн КАК ИннКлиента, + ОстаткиОбороты.СуммаОборот, + ОстаткиОбороты.СуммаОборотДт, + ОстаткиОбороты.СуммаОборотКт, + ОстаткиОбороты.СуммаКонечныйОстаток, + ОстаткиОбороты.СуммаКонечныйОстатокДт, + ОстаткиОбороты.СуммаКонечныйОстатокКт, + ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт, + ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт, + UUID(ОстаткиОбороты.Регистратор.ССылка) КАК ИдентификаторРегистратора, + UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора, + UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента, + UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента, + ОстаткиОбороты.Период КАК ДатаОтчета +ИЗ + РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( , ДАТАВРЕМЯ({t_minus_2.year},{t_minus_2.month},{t_minus_2.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты + """ +# РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( ДАТАВРЕМЯ({date.year},{date.month},{date.day}), ДАТАВРЕМЯ({date.year},{date.month},{date.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты + auth = HTTPBasicAuth('obmen', 'bOR2W7w4') + response = requests.post( + # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest - auth = HTTPBasicAuth('obmen', 'bOR2W7w4') - response = requests.post( - # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest + url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', + json={"query":query, "params": params}, + auth=auth, + verify=False + ) - url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', - json={"query":query, "params": params}, - auth=auth, - verify=False - ) + data_from_1c = response.json() + df = pd.DataFrame(data_from_1c['data']) - data_from_1c = response.json() - df = pd.DataFrame(data_from_1c['data']) - - field_mapping = { - # Основные поля - 'Счет': 'schet', - 'Субконто1': 'subkonto1', - 'Субконто2': 'subkonto2', - 'Организация': 'organizaciya', - 'НомерДоговора': 'nomer', - 'ДатаДоговора': 'date_begin', - 'СрокДействияДоговора': 'date_end', - 'ИннКонтрагента': 'inn_subkonto1', - 'ИннКлиента': 'inn_organizaciya', - - # Суммовые остатки и обороты - 'СуммаОборот': 'summa_oborot', - 'СуммаОборотДт': 'summa_oborot_dt', - 'СуммаОборотКт': 'summa_oborot_kt', - 'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok', - 'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt', - 'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt', - 'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt', - 'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt', - - #ID - 'ИдентификаторДоговора': 'uid_subkonto2', - 'ИдентификаторКонтрагента': 'uid_subkonto1', - 'ИдентификаторКлиента': 'uid_organizaciya', - - #DATE - 'ДатаОтчета': 'get_date' - } - - df = df.rename(columns=field_mapping) - # Удаляем дубликаты по ключевым полям перед вставкой - group_columns = [ - 'schet', 'subkonto1', 'subkonto2', 'organizaciya', 'nomer', - 'date_begin', 'date_end', 'inn_subkonto1', 'inn_organizaciya', - 'uid_subkonto2', 'uid_subkonto1', 'uid_organizaciya', 'get_date' - ] - columns_for_sum = [ - 'summa_oborot', 'summa_oborot_dt', 'summa_oborot_kt', - 'summa_konechnyy_ostatok', 'summa_konechnyy_ostatok_dt', 'summa_konechnyy_ostatok_kt', - 'summa_konechnyy_razvernutyy_ostatok_dt', 'summa_konechnyy_razvernutyy_ostatok_kt' - ] - sum_columns = { - 'summa_oborot': 'sum', 'summa_oborot_dt': 'sum', 'summa_oborot_kt': 'sum', - 'summa_konechnyy_ostatok': 'sum', 'summa_konechnyy_ostatok_dt': 'sum', 'summa_konechnyy_ostatok_kt': 'sum', - 'summa_konechnyy_razvernutyy_ostatok_dt': 'sum', 'summa_konechnyy_razvernutyy_ostatok_kt': 'sum' - } - for col in columns_for_sum: - if col in df.columns: - # Очищаем и преобразуем числовые значения - df[col] = (df[col] - .str.replace('\xa0', '', regex=False) # Убираем неразрывные пробелы - .str.replace(' ', '', regex=False) # Убираем обычные пробелы - .str.replace(',', '.', regex=False) # Запятые -> точки для десятичных - .apply(lambda x: x if x.replace('.', '').replace('-', '').isdigit() else None) - .apply(pd.to_numeric, errors='coerce')) - - # Диагностика - original = df[col].copy() - cleaned = df[col] - failed = cleaned.isna() & original.notna() - - if failed.any(): - display(f" {col}: не преобразовано {failed.sum()} значений") - display(f" Примеры: {original[failed].head(3).tolist()}") + field_mapping = { + # Основные поля + 'Счет': 'schet', + 'Регистратор': 'registrator', + 'Субконто1': 'subkonto1', + 'Субконто2': 'subkonto2', + 'Организация': 'organizaciya', + 'НомерДоговора': 'nomer', + 'ДатаДоговора': 'date_begin', + 'СрокДействияДоговора': 'date_end', + 'ИннКонтрагента': 'inn_subkonto1', + 'ИннКлиента': 'inn_organizaciya', - df = df.groupby(group_columns, as_index=False).agg(sum_columns) + # Суммовые остатки и обороты + 'СуммаОборот': 'summa_oborot', + 'СуммаОборотДт': 'summa_oborot_dt', + 'СуммаОборотКт': 'summa_oborot_kt', + 'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok', + 'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt', + 'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt', + 'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt', + 'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt', - # Преобразуем даты из формата DD.MM.YYYY HH:MM:SS в datetime - date_columns = ['date_begin', 'date_end', 'get_date'] + #ID + 'ИдентификаторРегистратора': 'uid_registrator', + 'ИдентификаторДоговора': 'uid_subkonto2', + 'ИдентификаторКонтрагента': 'uid_subkonto1', + 'ИдентификаторКлиента': 'uid_organizaciya', - for col in date_columns: - if col in df.columns: - # Пробуем разные форматы дат - df[col] = pd.to_datetime(df[col], format='%d.%m.%Y %H:%M:%S', errors='coerce') - - # Если не сработало, пробуем без времени - if df[col].isna().any(): - df[col] = pd.to_datetime(df[col], format='%d.%m.%Y', errors='coerce') + #DATE + 'Период': 'get_date' + } - # Убедимся, что все даты преобразованы - for col in date_columns: - if col in df.columns: - failed_count = df[col].isna().sum() - if failed_count > 0: - print(f" Внимание: в колонке '{col}' осталось {failed_count} некорректных дат") - - - with engine.begin() as conn: - if not df.empty: - conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}") - conn.execute(f"CREATE TEMP TABLE {temp_table_name} (LIKE public.{table_name})") - df.to_sql( - f'temp_{table_name}', - con=conn, - if_exists='append', - index=False, - method='multi' - ) - # conn.execute(f"DELETE FROM public.{table_name} where get_date = (SELECT DISTINCT get_date FROM temp_{table_name})") - conn.execute(f""" - INSERT INTO public.{table_name} - SELECT - schet - , subkonto1 - , subkonto2 - , organizaciya - , case when nomer = '' then null else nomer end nomer - , case when date_begin = '0001-01-01' then null else date_begin end date_begin - , case when date_begin = '0001-01-01' then null else date_end end date_end - , inn_subkonto1 as inn_subkonto1 - , inn_organizaciya as inn_organizaciya - , summa_oborot - , summa_oborot_dt - , summa_oborot_kt - , summa_konechnyy_ostatok - , summa_konechnyy_ostatok_dt - , summa_konechnyy_ostatok_kt - , summa_konechnyy_razvernutyy_ostatok_dt - , summa_konechnyy_razvernutyy_ostatok_kt - , uid_subkonto2 - , uid_subkonto1 - , uid_organizaciya - , get_date::date::timestamp - FROM temp_{table_name} - ON CONFLICT (schet, uid_subkonto2, uid_subkonto1, uid_organizaciya, get_date) - DO UPDATE SET - subkonto1 = EXCLUDED.subkonto1, - subkonto2 = EXCLUDED.subkonto2, - organizaciya = EXCLUDED.organizaciya, - nomer = EXCLUDED.nomer, - date_begin = EXCLUDED.date_begin, - date_end = EXCLUDED.date_end, - inn_subkonto1 = EXCLUDED.inn_subkonto1, - inn_organizaciya = EXCLUDED.inn_organizaciya, - summa_oborot = EXCLUDED.summa_oborot, - summa_oborot_dt = EXCLUDED.summa_oborot_dt, - summa_oborot_kt = EXCLUDED.summa_oborot_kt, - summa_konechnyy_ostatok = EXCLUDED.summa_konechnyy_ostatok, - summa_konechnyy_ostatok_dt = EXCLUDED.summa_konechnyy_ostatok_dt, - summa_konechnyy_ostatok_kt = EXCLUDED.summa_konechnyy_ostatok_kt, - summa_konechnyy_razvernutyy_ostatok_dt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_dt, - summa_konechnyy_razvernutyy_ostatok_kt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_kt - """ - ) + df = df.rename(columns=field_mapping) + # # Удаляем дубликаты по ключевым полям перед вставкой + # group_columns = [ + # 'schet', 'subkonto1', 'subkonto2', 'organizaciya', 'nomer', + # 'date_begin', 'date_end', 'inn_subkonto1', 'inn_organizaciya', + # 'uid_subkonto2', 'uid_subkonto1', 'uid_organizaciya', 'get_date' + # ] + # columns_for_sum = [ + # 'summa_oborot', 'summa_oborot_dt', 'summa_oborot_kt', + # 'summa_konechnyy_ostatok', 'summa_konechnyy_ostatok_dt', 'summa_konechnyy_ostatok_kt', + # 'summa_konechnyy_razvernutyy_ostatok_dt', 'summa_konechnyy_razvernutyy_ostatok_kt' + # ] + # sum_columns = { + # 'summa_oborot': 'sum', 'summa_oborot_dt': 'sum', 'summa_oborot_kt': 'sum', + # 'summa_konechnyy_ostatok': 'sum', 'summa_konechnyy_ostatok_dt': 'sum', 'summa_konechnyy_ostatok_kt': 'sum', + # 'summa_konechnyy_razvernutyy_ostatok_dt': 'sum', 'summa_konechnyy_razvernutyy_ostatok_kt': 'sum' + # } + # for col in columns_for_sum: + # if col in df.columns: + # # Очищаем и преобразуем числовые значения + # df[col] = (df[col] + # .str.replace('\xa0', '', regex=False) # Убираем неразрывные пробелы + # .str.replace(' ', '', regex=False) # Убираем обычные пробелы + # .str.replace(',', '.', regex=False) # Запятые -> точки для десятичных + # .apply(lambda x: x if x.replace('.', '').replace('-', '').isdigit() else None) + # .apply(pd.to_numeric, errors='coerce')) + + # # Диагностика + # original = df[col].copy() + # cleaned = df[col] + # failed = cleaned.isna() & original.notna() + + # if failed.any(): + # display(f" {col}: не преобразовано {failed.sum()} значений") + # display(f" Примеры: {original[failed].head(3).tolist()}") + + # df = df.groupby(group_columns, as_index=False).agg(sum_columns) + + # Преобразуем даты из формата DD.MM.YYYY HH:MM:SS в datetime + date_columns = ['date_begin', 'date_end', 'get_date'] + + for col in date_columns: + if col in df.columns: + # Пробуем разные форматы дат + df[col] = pd.to_datetime(df[col], format='%d.%m.%Y %H:%M:%S', errors='coerce') + + # Если не сработало, пробуем без времени + if df[col].isna().any(): + df[col] = pd.to_datetime(df[col], format='%d.%m.%Y', errors='coerce') + + # Убедимся, что все даты преобразованы + for col in date_columns: + if col in df.columns: + failed_count = df[col].isna().sum() + if failed_count > 0: + print(f" Внимание: в колонке '{col}' осталось {failed_count} некорректных дат") + + + with engine.begin() as conn: + if not df.empty: + conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}") + conn.execute(f"CREATE TEMP TABLE {temp_table_name} (LIKE public.{table_name})") + df.to_sql( + f'temp_{table_name}', + con=conn, + if_exists='append', + index=False, + method='multi' + ) + conn.execute(f"TRUNCATE TABLE public.{table_name}") + conn.execute(f""" + INSERT INTO public.{table_name} + SELECT + schet + , registrator + , subkonto1 + , subkonto2 + , organizaciya + , case when nomer = '' then null else nomer end nomer + , case when date_begin = '0001-01-01' then null else date_begin end date_begin + , case when date_begin = '0001-01-01' then null else date_end end date_end + , inn_subkonto1 as inn_subkonto1 + , inn_organizaciya as inn_organizaciya + , summa_oborot + , summa_oborot_dt + , summa_oborot_kt + , summa_konechnyy_ostatok + , summa_konechnyy_ostatok_dt + , summa_konechnyy_ostatok_kt + , summa_konechnyy_razvernutyy_ostatok_dt + , summa_konechnyy_razvernutyy_ostatok_kt + , uid_registrator + , uid_subkonto2 + , uid_subkonto1 + , uid_organizaciya + , get_date::date::timestamp + FROM temp_{table_name} + """ + ) with DAG( dag_id='data_download_from_1C_source',