Обновить dags/OSV.py

This commit is contained in:
bn_user 2025-11-18 16:59:32 +00:00
parent 77976033f3
commit c127df2687
1 changed files with 151 additions and 105 deletions

View File

@ -2,7 +2,7 @@ import requests
import json import json
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from datetime import datetime from datetime import datetime, timedelta
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine from sqlalchemy import create_engine
from airflow import DAG from airflow import DAG
@ -32,117 +32,163 @@ def get_db_engine():
) )
def read_data_1C(**kwargs): def read_data_1C(**kwargs):
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
query = """ВЫБРАТЬ
ОстаткиОбороты.Счет,
ОстаткиОбороты.Субконто1,
ОстаткиОбороты.Субконто2,
ОстаткиОбороты.Организация,
ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора,
ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора,
ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора,
ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента,
ОстаткиОбороты.Организация.Инн КАК ИннКлиента,
ОстаткиОбороты.СуммаОборот,
ОстаткиОбороты.СуммаОборотДт,
ОстаткиОбороты.СуммаОборотКт,
ОстаткиОбороты.СуммаКонечныйОстаток,
ОстаткиОбороты.СуммаКонечныйОстатокДт,
ОстаткиОбороты.СуммаКонечныйОстатокКт,
ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт,
ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт,
UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора,
UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента,
UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента
ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты
ГДЕ
ОстаткиОбороты.Счет.Код В (&СписокСчетов)
""" """
Создает DataFrame с ежедневным календарем и останавливается на дате T-2
auth = HTTPBasicAuth('obmen', 'bOR2W7w4') Параметры:
response = requests.post( start_year (int): начальный год (по умолчанию 2019)
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest end_year (int): конечный год (по умолчанию 2025)
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', Возвращает:
json={"query":query, "params": params}, pandas.DataFrame: DataFrame с колонками Год, Месяц, День
auth=auth, """
verify=False start_year = 2019
) # Вычисляем дату T-2 (текущая дата минус 2 дня)
t_minus_2 = datetime.now() - timedelta(days=2)
t_minus_2 = t_minus_2.replace(hour=0, minute=0, second=0, microsecond=0)
data_from_1c = response.json() print(f"Текущая дата: {datetime.now().strftime('%Y-%m-%d')}")
df = pd.DataFrame(data_from_1c['data']) print(f"Дата T-2: {t_minus_2.strftime('%Y-%m-%d')}")
engine = get_db_engine() # Создаем диапазон дат от начала до T-2
table_name = 'oborotno_salbdovaya_vedomostb' start_date = f'{start_year}-01-01'
field_mapping = { date_range = pd.date_range(start=start_date, end=t_minus_2, freq='D')
# Основные поля
'Счет': 'schet',
'Субконто1': 'subkonto1',
'Субконто2': 'subkonto2',
'Организация': 'organizaciya',
'НомерДоговора': 'nomer',
'ДатаДоговора': 'date_begin',
'СрокДействияДоговора': 'date_end',
'ИннКонтрагента': 'inn_subkonto1',
'ИннКлиента': 'inn_organizaciya',
# Суммовые остатки и обороты # Создаем пустой список для хранения данных
'СуммаОборот': 'summa_oborot', calendar_data = []
'СуммаОборотДт': 'summa_oborot_dt', params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
'СуммаОборотКт': 'summa_oborot_kt', # Проходим циклом по каждому дню
'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok', print("\nПрохождение по дням календаря:")
'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt', for i, date in enumerate(date_range):
'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt', query = f"""ВЫБРАТЬ
'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt', ОстаткиОбороты.Счет,
'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt', ОстаткиОбороты.Субконто1,
ОстаткиОбороты.Субконто2,
ОстаткиОбороты.Организация,
ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора,
ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора,
ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора,
ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента,
ОстаткиОбороты.Организация.Инн КАК ИннКлиента,
ОстаткиОбороты.СуммаОборот,
ОстаткиОбороты.СуммаОборотДт,
ОстаткиОбороты.СуммаОборотКт,
ОстаткиОбороты.СуммаКонечныйОстаток,
ОстаткиОбороты.СуммаКонечныйОстатокДт,
ОстаткиОбороты.СуммаКонечныйОстатокКт,
ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт,
ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт,
UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора,
UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента,
UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента,
ОстаткиОбороты.Период КАК ДатаОтчета
ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( ДАТАВРЕМЯ({date.year},{date.month},{date.day}), ДАТАВРЕМЯ({date.year},{date.month},{date.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты
"""
#ID auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
'ИдентификаторДоговора': 'uid_subkonto2', response = requests.post(
'ИдентификаторКонтрагента': 'uid_subkonto1', # http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
'ИдентификаторКлиента': 'uid_organizaciya'
}
df = df.rename(columns=field_mapping) url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
with engine.begin() as conn: json={"query":query, "params": params},
if not df.empty: auth=auth,
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") verify=False
df.to_sql( )
f'temp_{table_name}',
con=conn, data_from_1c = response.json()
if_exists='append', df = pd.DataFrame(data_from_1c['data'])
index=False, engine = get_db_engine()
method='multi' table_name = 'oborotno_salbdovaya_vedomostb'
)
conn.execute(f"DELETE FROM public.{table_name} where get_date = CURRENT_DATE::date::timestamp") field_mapping = {
conn.execute(f""" # Основные поля
INSERT INTO public.{table_name} 'Счет': 'schet',
SELECT 'Субконто1': 'subkonto1',
schet 'Субконто2': 'subkonto2',
, subkonto1 'Организация': 'organizaciya',
, subkonto2 'НомерДоговора': 'nomer',
, organizaciya 'ДатаДоговора': 'date_begin',
, case when nomer = '' then null else nomer end nomer 'СрокДействияДоговора': 'date_end',
, case when TO_DATE(date_begin, 'DD.MM.YYYY') = '0001-01-01' then null else date_begin end date_begin 'ИннКонтрагента': 'inn_subkonto1',
, case when TO_DATE(date_end, 'DD.MM.YYYY') = '0001-01-01' then null else date_end end date_end 'ИннКлиента': 'inn_organizaciya',
, inn_subkonto1 as inn_subkonto1
, inn_organizaciya as inn_organizaciya # Суммовые остатки и обороты
, summa_oborot 'СуммаОборот': 'summa_oborot',
, summa_oborot_dt 'СуммаОборотДт': 'summa_oborot_dt',
, summa_oborot_kt 'СуммаОборотКт': 'summa_oborot_kt',
, summa_konechnyy_ostatok 'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok',
, summa_konechnyy_ostatok_dt 'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt',
, summa_konechnyy_ostatok_kt 'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt',
, summa_konechnyy_razvernutyy_ostatok_dt 'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt',
, summa_konechnyy_razvernutyy_ostatok_kt 'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt',
, uid_subkonto2
, uid_subkonto1 #ID
, uid_organizaciya 'ИдентификаторДоговора': 'uid_subkonto2',
, CURRENT_DATE::date::timestamp 'ИдентификаторКонтрагента': 'uid_subkonto1',
FROM temp_{table_name} 'ИдентификаторКлиента': 'uid_organizaciya'
--ON CONFLICT (schet, uid_subkonto2, uid_subkonto1, uid_organizaciya)
""" #DATE
) 'ДатаОтчета': 'get_date'
}
df = df.rename(columns=field_mapping)
with engine.begin() as conn:
if not df.empty:
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
df.to_sql(
f'temp_{table_name}',
con=conn,
if_exists='append',
index=False,
method='multi'
)
# conn.execute(f"DELETE FROM public.{table_name} where get_date = (SELECT DISTINCT get_date FROM temp_{table_name})")
conn.execute(f"""
INSERT INTO public.{table_name}
SELECT
schet
, subkonto1
, subkonto2
, organizaciya
, case when nomer = '' then null else nomer end nomer
, case when TO_DATE(date_begin, 'DD.MM.YYYY') = '0001-01-01' then null else date_begin end date_begin
, case when TO_DATE(date_end, 'DD.MM.YYYY') = '0001-01-01' then null else date_end end date_end
, inn_subkonto1 as inn_subkonto1
, inn_organizaciya as inn_organizaciya
, summa_oborot
, summa_oborot_dt
, summa_oborot_kt
, summa_konechnyy_ostatok
, summa_konechnyy_ostatok_dt
, summa_konechnyy_ostatok_kt
, summa_konechnyy_razvernutyy_ostatok_dt
, summa_konechnyy_razvernutyy_ostatok_kt
, uid_subkonto2
, uid_subkonto1
, uid_organizaciya
, CURRENT_DATE::date::timestamp
FROM temp_{table_name}
ON CONFLICT (schet, uid_subkonto2, uid_subkonto1, uid_organizaciya, get_date)
DO UPDATE SET
subkonto1 = EXCLUDED.subkonto1,
subkonto2 = EXCLUDED.subkonto2,
organizaciya = EXCLUDED.organizaciya,
nomer = EXCLUDED.nomer,
date_begin = EXCLUDED.date_begin,
date_end = EXCLUDED.date_end,
inn_subkonto1 = EXCLUDED.inn_subkonto1,
inn_organizaciya = EXCLUDED.inn_organizaciya,
summa_oborot = EXCLUDED.summa_oborot,
summa_oborot_dt = EXCLUDED.summa_oborot_dt,
summa_oborot_kt = EXCLUDED.summa_oborot_kt,
summa_konechnyy_ostatok = EXCLUDED.summa_konechnyy_ostatok,
summa_konechnyy_ostatok_dt = EXCLUDED.summa_konechnyy_ostatok_dt,
summa_konechnyy_ostatok_kt = EXCLUDED.summa_konechnyy_ostatok_kt,
summa_konechnyy_razvernutyy_ostatok_dt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_dt,
summa_konechnyy_razvernutyy_ostatok_kt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_kt
"""
)
with DAG( with DAG(
dag_id='data_download_from_1C_source', dag_id='data_download_from_1C_source',