From c127df2687a4d44804ffc0b44f93b090e6aca497 Mon Sep 17 00:00:00 2001 From: bn_user Date: Tue, 18 Nov 2025 16:59:32 +0000 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20dags/OSV.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags/OSV.py | 256 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 151 insertions(+), 105 deletions(-) diff --git a/dags/OSV.py b/dags/OSV.py index 1b0bdda..e6c5ea8 100644 --- a/dags/OSV.py +++ b/dags/OSV.py @@ -2,7 +2,7 @@ import requests import json import pandas as pd import numpy as np -from datetime import datetime +from datetime import datetime, timedelta from requests.auth import HTTPBasicAuth from sqlalchemy import create_engine from airflow import DAG @@ -32,117 +32,163 @@ def get_db_engine(): ) def read_data_1C(**kwargs): - params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} - query = """ВЫБРАТЬ - ОстаткиОбороты.Счет, - ОстаткиОбороты.Субконто1, - ОстаткиОбороты.Субконто2, - ОстаткиОбороты.Организация, - ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора, - ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора, - ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора, - ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента, - ОстаткиОбороты.Организация.Инн КАК ИннКлиента, - ОстаткиОбороты.СуммаОборот, - ОстаткиОбороты.СуммаОборотДт, - ОстаткиОбороты.СуммаОборотКт, - ОстаткиОбороты.СуммаКонечныйОстаток, - ОстаткиОбороты.СуммаКонечныйОстатокДт, - ОстаткиОбороты.СуммаКонечныйОстатокКт, - ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт, - ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт, - UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора, - UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента, - UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента -ИЗ - РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты -ГДЕ - ОстаткиОбороты.Счет.Код В (&СписокСчетов) """ + Создает DataFrame с ежедневным календарем и останавливается на дате T-2 + + Параметры: + start_year (int): начальный год (по умолчанию 2019) + end_year (int): конечный год (по умолчанию 2025) + + Возвращает: + pandas.DataFrame: DataFrame с колонками Год, Месяц, День + """ + start_year = 2019 + # Вычисляем дату T-2 (текущая дата минус 2 дня) + t_minus_2 = datetime.now() - timedelta(days=2) + t_minus_2 = t_minus_2.replace(hour=0, minute=0, second=0, microsecond=0) - auth = HTTPBasicAuth('obmen', 'bOR2W7w4') - response = requests.post( - # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest + print(f"Текущая дата: {datetime.now().strftime('%Y-%m-%d')}") + print(f"Дата T-2: {t_minus_2.strftime('%Y-%m-%d')}") + # Создаем диапазон дат от начала до T-2 + start_date = f'{start_year}-01-01' + + date_range = pd.date_range(start=start_date, end=t_minus_2, freq='D') + + # Создаем пустой список для хранения данных + calendar_data = [] + params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} + # Проходим циклом по каждому дню + print("\nПрохождение по дням календаря:") + for i, date in enumerate(date_range): + query = f"""ВЫБРАТЬ + ОстаткиОбороты.Счет, + ОстаткиОбороты.Субконто1, + ОстаткиОбороты.Субконто2, + ОстаткиОбороты.Организация, + ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора, + ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора, + ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора, + ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента, + ОстаткиОбороты.Организация.Инн КАК ИннКлиента, + ОстаткиОбороты.СуммаОборот, + ОстаткиОбороты.СуммаОборотДт, + ОстаткиОбороты.СуммаОборотКт, + ОстаткиОбороты.СуммаКонечныйОстаток, + ОстаткиОбороты.СуммаКонечныйОстатокДт, + ОстаткиОбороты.СуммаКонечныйОстатокКт, + ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт, + ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт, + UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора, + UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента, + UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента, + ОстаткиОбороты.Период КАК ДатаОтчета + ИЗ + РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( ДАТАВРЕМЯ({date.year},{date.month},{date.day}), ДАТАВРЕМЯ({date.year},{date.month},{date.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты + """ - url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', - json={"query":query, "params": params}, - auth=auth, - verify=False - ) + auth = HTTPBasicAuth('obmen', 'bOR2W7w4') + response = requests.post( + # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest - data_from_1c = response.json() - df = pd.DataFrame(data_from_1c['data']) - engine = get_db_engine() - table_name = 'oborotno_salbdovaya_vedomostb' + url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', + json={"query":query, "params": params}, + auth=auth, + verify=False + ) - field_mapping = { - # Основные поля - 'Счет': 'schet', - 'Субконто1': 'subkonto1', - 'Субконто2': 'subkonto2', - 'Организация': 'organizaciya', - 'НомерДоговора': 'nomer', - 'ДатаДоговора': 'date_begin', - 'СрокДействияДоговора': 'date_end', - 'ИннКонтрагента': 'inn_subkonto1', - 'ИннКлиента': 'inn_organizaciya', - - # Суммовые остатки и обороты - 'СуммаОборот': 'summa_oborot', - 'СуммаОборотДт': 'summa_oborot_dt', - 'СуммаОборотКт': 'summa_oborot_kt', - 'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok', - 'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt', - 'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt', - 'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt', - 'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt', + data_from_1c = response.json() + df = pd.DataFrame(data_from_1c['data']) + engine = get_db_engine() + table_name = 'oborotno_salbdovaya_vedomostb' - #ID - 'ИдентификаторДоговора': 'uid_subkonto2', - 'ИдентификаторКонтрагента': 'uid_subkonto1', - 'ИдентификаторКлиента': 'uid_organizaciya' - } + field_mapping = { + # Основные поля + 'Счет': 'schet', + 'Субконто1': 'subkonto1', + 'Субконто2': 'subkonto2', + 'Организация': 'organizaciya', + 'НомерДоговора': 'nomer', + 'ДатаДоговора': 'date_begin', + 'СрокДействияДоговора': 'date_end', + 'ИннКонтрагента': 'inn_subkonto1', + 'ИннКлиента': 'inn_organizaciya', + + # Суммовые остатки и обороты + 'СуммаОборот': 'summa_oborot', + 'СуммаОборотДт': 'summa_oborot_dt', + 'СуммаОборотКт': 'summa_oborot_kt', + 'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok', + 'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt', + 'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt', + 'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt', + 'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt', - df = df.rename(columns=field_mapping) - with engine.begin() as conn: - if not df.empty: - conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") - df.to_sql( - f'temp_{table_name}', - con=conn, - if_exists='append', - index=False, - method='multi' - ) - conn.execute(f"DELETE FROM public.{table_name} where get_date = CURRENT_DATE::date::timestamp") - conn.execute(f""" - INSERT INTO public.{table_name} - SELECT - schet - , subkonto1 - , subkonto2 - , organizaciya - , case when nomer = '' then null else nomer end nomer - , case when TO_DATE(date_begin, 'DD.MM.YYYY') = '0001-01-01' then null else date_begin end date_begin - , case when TO_DATE(date_end, 'DD.MM.YYYY') = '0001-01-01' then null else date_end end date_end - , inn_subkonto1 as inn_subkonto1 - , inn_organizaciya as inn_organizaciya - , summa_oborot - , summa_oborot_dt - , summa_oborot_kt - , summa_konechnyy_ostatok - , summa_konechnyy_ostatok_dt - , summa_konechnyy_ostatok_kt - , summa_konechnyy_razvernutyy_ostatok_dt - , summa_konechnyy_razvernutyy_ostatok_kt - , uid_subkonto2 - , uid_subkonto1 - , uid_organizaciya - , CURRENT_DATE::date::timestamp - FROM temp_{table_name} - --ON CONFLICT (schet, uid_subkonto2, uid_subkonto1, uid_organizaciya) - """ - ) + #ID + 'ИдентификаторДоговора': 'uid_subkonto2', + 'ИдентификаторКонтрагента': 'uid_subkonto1', + 'ИдентификаторКлиента': 'uid_organizaciya' + + #DATE + 'ДатаОтчета': 'get_date' + } + + df = df.rename(columns=field_mapping) + with engine.begin() as conn: + if not df.empty: + conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") + df.to_sql( + f'temp_{table_name}', + con=conn, + if_exists='append', + index=False, + method='multi' + ) + # conn.execute(f"DELETE FROM public.{table_name} where get_date = (SELECT DISTINCT get_date FROM temp_{table_name})") + conn.execute(f""" + INSERT INTO public.{table_name} + SELECT + schet + , subkonto1 + , subkonto2 + , organizaciya + , case when nomer = '' then null else nomer end nomer + , case when TO_DATE(date_begin, 'DD.MM.YYYY') = '0001-01-01' then null else date_begin end date_begin + , case when TO_DATE(date_end, 'DD.MM.YYYY') = '0001-01-01' then null else date_end end date_end + , inn_subkonto1 as inn_subkonto1 + , inn_organizaciya as inn_organizaciya + , summa_oborot + , summa_oborot_dt + , summa_oborot_kt + , summa_konechnyy_ostatok + , summa_konechnyy_ostatok_dt + , summa_konechnyy_ostatok_kt + , summa_konechnyy_razvernutyy_ostatok_dt + , summa_konechnyy_razvernutyy_ostatok_kt + , uid_subkonto2 + , uid_subkonto1 + , uid_organizaciya + , CURRENT_DATE::date::timestamp + FROM temp_{table_name} + ON CONFLICT (schet, uid_subkonto2, uid_subkonto1, uid_organizaciya, get_date) + DO UPDATE SET + subkonto1 = EXCLUDED.subkonto1, + subkonto2 = EXCLUDED.subkonto2, + organizaciya = EXCLUDED.organizaciya, + nomer = EXCLUDED.nomer, + date_begin = EXCLUDED.date_begin, + date_end = EXCLUDED.date_end, + inn_subkonto1 = EXCLUDED.inn_subkonto1, + inn_organizaciya = EXCLUDED.inn_organizaciya, + summa_oborot = EXCLUDED.summa_oborot, + summa_oborot_dt = EXCLUDED.summa_oborot_dt, + summa_oborot_kt = EXCLUDED.summa_oborot_kt, + summa_konechnyy_ostatok = EXCLUDED.summa_konechnyy_ostatok, + summa_konechnyy_ostatok_dt = EXCLUDED.summa_konechnyy_ostatok_dt, + summa_konechnyy_ostatok_kt = EXCLUDED.summa_konechnyy_ostatok_kt, + summa_konechnyy_razvernutyy_ostatok_dt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_dt, + summa_konechnyy_razvernutyy_ostatok_kt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_kt + """ + ) with DAG( dag_id='data_download_from_1C_source',