Обновить dags/OSV_with_docs.py

This commit is contained in:
bn_user 2025-11-30 18:12:15 +00:00
parent b6c516037c
commit b20a7484e5
1 changed files with 171 additions and 183 deletions

View File

@ -52,203 +52,191 @@ def read_data_1C(**kwargs):
# Создаем диапазон дат от начала до T-2 # Создаем диапазон дат от начала до T-2
start_date = f'{start_year}-01-01' start_date = f'{start_year}-01-01'
date_range = pd.date_range(start=start_date, end=t_minus_2, freq='D') # date_range = pd.date_range(start=start_date, end=t_minus_2, freq='D')
# Создаем пустой список для хранения данных # Создаем пустой список для хранения данных
calendar_data = [] calendar_data = []
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]} params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
engine = get_db_engine() engine = get_db_engine()
table_name = 'oborotno_salbdovaya_vedomostb' table_name = 'oborotno_salbdovaya_vedomostb_with_docs'
temp_table_name = f'temp_{table_name}' temp_table_name = f'temp_{table_name}'
# Проходим циклом по каждому дню # Проходим циклом по каждому дню
print("\nПрохождение по дням календаря:") # print("\nПрохождение по дням календаря:")
for i, date in enumerate(date_range): # for i, date in enumerate(date_range):
print(f'{date}') # print(f'{date}')
query = f"""ВЫБРАТЬ query = f"""ВЫБРАТЬ
ОстаткиОбороты.Счет, ОстаткиОбороты.Счет,
ОстаткиОбороты.Субконто1, ОстаткиОбороты.Регистратор,
ОстаткиОбороты.Субконто2, ОстаткиОбороты.Субконто1,
ОстаткиОбороты.Организация, ОстаткиОбороты.Субконто2,
ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора, ОстаткиОбороты.Организация,
ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора, ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора,
ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора, ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора,
ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента, ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора,
ОстаткиОбороты.Организация.Инн КАК ИннКлиента, ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента,
ОстаткиОбороты.СуммаОборот, ОстаткиОбороты.Организация.Инн КАК ИннКлиента,
ОстаткиОбороты.СуммаОборотДт, ОстаткиОбороты.СуммаОборот,
ОстаткиОбороты.СуммаОборотКт, ОстаткиОбороты.СуммаОборотДт,
ОстаткиОбороты.СуммаКонечныйОстаток, ОстаткиОбороты.СуммаОборотКт,
ОстаткиОбороты.СуммаКонечныйОстатокДт, ОстаткиОбороты.СуммаКонечныйОстаток,
ОстаткиОбороты.СуммаКонечныйОстатокКт, ОстаткиОбороты.СуммаКонечныйОстатокДт,
ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт, ОстаткиОбороты.СуммаКонечныйОстатокКт,
ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт, ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт,
UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора, ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт,
UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента, UUID(ОстаткиОбороты.Регистратор.ССылка) КАК ИдентификаторРегистратора,
UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента, UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора,
ОстаткиОбороты.Период КАК ДатаОтчета UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента,
ИЗ UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента,
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( ДАТАВРЕМЯ({date.year},{date.month},{date.day}), ДАТАВРЕМЯ({date.year},{date.month},{date.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты ОстаткиОбороты.Период КАК ДатаОтчета
""" ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( , ДАТАВРЕМЯ({t_minus_2.year},{t_minus_2.month},{t_minus_2.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты
"""
# РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( ДАТАВРЕМЯ({date.year},{date.month},{date.day}), ДАТАВРЕМЯ({date.year},{date.month},{date.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
response = requests.post(
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
auth = HTTPBasicAuth('obmen', 'bOR2W7w4') url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
response = requests.post( json={"query":query, "params": params},
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest auth=auth,
verify=False
)
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest', data_from_1c = response.json()
json={"query":query, "params": params}, df = pd.DataFrame(data_from_1c['data'])
auth=auth,
verify=False
)
data_from_1c = response.json() field_mapping = {
df = pd.DataFrame(data_from_1c['data']) # Основные поля
'Счет': 'schet',
field_mapping = { 'Регистратор': 'registrator',
# Основные поля 'Субконто1': 'subkonto1',
'Счет': 'schet', 'Субконто2': 'subkonto2',
'Субконто1': 'subkonto1', 'Организация': 'organizaciya',
'Субконто2': 'subkonto2', 'НомерДоговора': 'nomer',
'Организация': 'organizaciya', 'ДатаДоговора': 'date_begin',
'НомерДоговора': 'nomer', 'СрокДействияДоговора': 'date_end',
'ДатаДоговора': 'date_begin', 'ИннКонтрагента': 'inn_subkonto1',
'СрокДействияДоговора': 'date_end', 'ИннКлиента': 'inn_organizaciya',
'ИннКонтрагента': 'inn_subkonto1',
'ИннКлиента': 'inn_organizaciya',
# Суммовые остатки и обороты
'СуммаОборот': 'summa_oborot',
'СуммаОборотДт': 'summa_oborot_dt',
'СуммаОборотКт': 'summa_oborot_kt',
'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok',
'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt',
'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt',
'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt',
'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt',
#ID
'ИдентификаторДоговора': 'uid_subkonto2',
'ИдентификаторКонтрагента': 'uid_subkonto1',
'ИдентификаторКлиента': 'uid_organizaciya',
#DATE
'ДатаОтчета': 'get_date'
}
df = df.rename(columns=field_mapping)
# Удаляем дубликаты по ключевым полям перед вставкой
group_columns = [
'schet', 'subkonto1', 'subkonto2', 'organizaciya', 'nomer',
'date_begin', 'date_end', 'inn_subkonto1', 'inn_organizaciya',
'uid_subkonto2', 'uid_subkonto1', 'uid_organizaciya', 'get_date'
]
columns_for_sum = [
'summa_oborot', 'summa_oborot_dt', 'summa_oborot_kt',
'summa_konechnyy_ostatok', 'summa_konechnyy_ostatok_dt', 'summa_konechnyy_ostatok_kt',
'summa_konechnyy_razvernutyy_ostatok_dt', 'summa_konechnyy_razvernutyy_ostatok_kt'
]
sum_columns = {
'summa_oborot': 'sum', 'summa_oborot_dt': 'sum', 'summa_oborot_kt': 'sum',
'summa_konechnyy_ostatok': 'sum', 'summa_konechnyy_ostatok_dt': 'sum', 'summa_konechnyy_ostatok_kt': 'sum',
'summa_konechnyy_razvernutyy_ostatok_dt': 'sum', 'summa_konechnyy_razvernutyy_ostatok_kt': 'sum'
}
for col in columns_for_sum:
if col in df.columns:
# Очищаем и преобразуем числовые значения
df[col] = (df[col]
.str.replace('\xa0', '', regex=False) # Убираем неразрывные пробелы
.str.replace(' ', '', regex=False) # Убираем обычные пробелы
.str.replace(',', '.', regex=False) # Запятые -> точки для десятичных
.apply(lambda x: x if x.replace('.', '').replace('-', '').isdigit() else None)
.apply(pd.to_numeric, errors='coerce'))
# Диагностика
original = df[col].copy()
cleaned = df[col]
failed = cleaned.isna() & original.notna()
if failed.any():
display(f" {col}: не преобразовано {failed.sum()} значений")
display(f" Примеры: {original[failed].head(3).tolist()}")
df = df.groupby(group_columns, as_index=False).agg(sum_columns) # Суммовые остатки и обороты
'СуммаОборот': 'summa_oborot',
'СуммаОборотДт': 'summa_oborot_dt',
'СуммаОборотКт': 'summa_oborot_kt',
'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok',
'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt',
'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt',
'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt',
'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt',
# Преобразуем даты из формата DD.MM.YYYY HH:MM:SS в datetime #ID
date_columns = ['date_begin', 'date_end', 'get_date'] 'ИдентификаторРегистратора': 'uid_registrator',
'ИдентификаторДоговора': 'uid_subkonto2',
'ИдентификаторКонтрагента': 'uid_subkonto1',
'ИдентификаторКлиента': 'uid_organizaciya',
for col in date_columns: #DATE
if col in df.columns: 'Период': 'get_date'
# Пробуем разные форматы дат }
df[col] = pd.to_datetime(df[col], format='%d.%m.%Y %H:%M:%S', errors='coerce')
# Если не сработало, пробуем без времени
if df[col].isna().any():
df[col] = pd.to_datetime(df[col], format='%d.%m.%Y', errors='coerce')
# Убедимся, что все даты преобразованы df = df.rename(columns=field_mapping)
for col in date_columns: # # Удаляем дубликаты по ключевым полям перед вставкой
if col in df.columns: # group_columns = [
failed_count = df[col].isna().sum() # 'schet', 'subkonto1', 'subkonto2', 'organizaciya', 'nomer',
if failed_count > 0: # 'date_begin', 'date_end', 'inn_subkonto1', 'inn_organizaciya',
print(f" Внимание: в колонке '{col}' осталось {failed_count} некорректных дат") # 'uid_subkonto2', 'uid_subkonto1', 'uid_organizaciya', 'get_date'
# ]
# columns_for_sum = [
with engine.begin() as conn: # 'summa_oborot', 'summa_oborot_dt', 'summa_oborot_kt',
if not df.empty: # 'summa_konechnyy_ostatok', 'summa_konechnyy_ostatok_dt', 'summa_konechnyy_ostatok_kt',
conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}") # 'summa_konechnyy_razvernutyy_ostatok_dt', 'summa_konechnyy_razvernutyy_ostatok_kt'
conn.execute(f"CREATE TEMP TABLE {temp_table_name} (LIKE public.{table_name})") # ]
df.to_sql( # sum_columns = {
f'temp_{table_name}', # 'summa_oborot': 'sum', 'summa_oborot_dt': 'sum', 'summa_oborot_kt': 'sum',
con=conn, # 'summa_konechnyy_ostatok': 'sum', 'summa_konechnyy_ostatok_dt': 'sum', 'summa_konechnyy_ostatok_kt': 'sum',
if_exists='append', # 'summa_konechnyy_razvernutyy_ostatok_dt': 'sum', 'summa_konechnyy_razvernutyy_ostatok_kt': 'sum'
index=False, # }
method='multi' # for col in columns_for_sum:
) # if col in df.columns:
# conn.execute(f"DELETE FROM public.{table_name} where get_date = (SELECT DISTINCT get_date FROM temp_{table_name})") # # Очищаем и преобразуем числовые значения
conn.execute(f""" # df[col] = (df[col]
INSERT INTO public.{table_name} # .str.replace('\xa0', '', regex=False) # Убираем неразрывные пробелы
SELECT # .str.replace(' ', '', regex=False) # Убираем обычные пробелы
schet # .str.replace(',', '.', regex=False) # Запятые -> точки для десятичных
, subkonto1 # .apply(lambda x: x if x.replace('.', '').replace('-', '').isdigit() else None)
, subkonto2 # .apply(pd.to_numeric, errors='coerce'))
, organizaciya
, case when nomer = '' then null else nomer end nomer # # Диагностика
, case when date_begin = '0001-01-01' then null else date_begin end date_begin # original = df[col].copy()
, case when date_begin = '0001-01-01' then null else date_end end date_end # cleaned = df[col]
, inn_subkonto1 as inn_subkonto1 # failed = cleaned.isna() & original.notna()
, inn_organizaciya as inn_organizaciya
, summa_oborot # if failed.any():
, summa_oborot_dt # display(f" {col}: не преобразовано {failed.sum()} значений")
, summa_oborot_kt # display(f" Примеры: {original[failed].head(3).tolist()}")
, summa_konechnyy_ostatok
, summa_konechnyy_ostatok_dt # df = df.groupby(group_columns, as_index=False).agg(sum_columns)
, summa_konechnyy_ostatok_kt
, summa_konechnyy_razvernutyy_ostatok_dt # Преобразуем даты из формата DD.MM.YYYY HH:MM:SS в datetime
, summa_konechnyy_razvernutyy_ostatok_kt date_columns = ['date_begin', 'date_end', 'get_date']
, uid_subkonto2
, uid_subkonto1 for col in date_columns:
, uid_organizaciya if col in df.columns:
, get_date::date::timestamp # Пробуем разные форматы дат
FROM temp_{table_name} df[col] = pd.to_datetime(df[col], format='%d.%m.%Y %H:%M:%S', errors='coerce')
ON CONFLICT (schet, uid_subkonto2, uid_subkonto1, uid_organizaciya, get_date)
DO UPDATE SET # Если не сработало, пробуем без времени
subkonto1 = EXCLUDED.subkonto1, if df[col].isna().any():
subkonto2 = EXCLUDED.subkonto2, df[col] = pd.to_datetime(df[col], format='%d.%m.%Y', errors='coerce')
organizaciya = EXCLUDED.organizaciya,
nomer = EXCLUDED.nomer, # Убедимся, что все даты преобразованы
date_begin = EXCLUDED.date_begin, for col in date_columns:
date_end = EXCLUDED.date_end, if col in df.columns:
inn_subkonto1 = EXCLUDED.inn_subkonto1, failed_count = df[col].isna().sum()
inn_organizaciya = EXCLUDED.inn_organizaciya, if failed_count > 0:
summa_oborot = EXCLUDED.summa_oborot, print(f" Внимание: в колонке '{col}' осталось {failed_count} некорректных дат")
summa_oborot_dt = EXCLUDED.summa_oborot_dt,
summa_oborot_kt = EXCLUDED.summa_oborot_kt,
summa_konechnyy_ostatok = EXCLUDED.summa_konechnyy_ostatok, with engine.begin() as conn:
summa_konechnyy_ostatok_dt = EXCLUDED.summa_konechnyy_ostatok_dt, if not df.empty:
summa_konechnyy_ostatok_kt = EXCLUDED.summa_konechnyy_ostatok_kt, conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}")
summa_konechnyy_razvernutyy_ostatok_dt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_dt, conn.execute(f"CREATE TEMP TABLE {temp_table_name} (LIKE public.{table_name})")
summa_konechnyy_razvernutyy_ostatok_kt = EXCLUDED.summa_konechnyy_razvernutyy_ostatok_kt df.to_sql(
""" f'temp_{table_name}',
) con=conn,
if_exists='append',
index=False,
method='multi'
)
conn.execute(f"TRUNCATE TABLE public.{table_name}")
conn.execute(f"""
INSERT INTO public.{table_name}
SELECT
schet
, registrator
, subkonto1
, subkonto2
, organizaciya
, case when nomer = '' then null else nomer end nomer
, case when date_begin = '0001-01-01' then null else date_begin end date_begin
, case when date_begin = '0001-01-01' then null else date_end end date_end
, inn_subkonto1 as inn_subkonto1
, inn_organizaciya as inn_organizaciya
, summa_oborot
, summa_oborot_dt
, summa_oborot_kt
, summa_konechnyy_ostatok
, summa_konechnyy_ostatok_dt
, summa_konechnyy_ostatok_kt
, summa_konechnyy_razvernutyy_ostatok_dt
, summa_konechnyy_razvernutyy_ostatok_kt
, uid_registrator
, uid_subkonto2
, uid_subkonto1
, uid_organizaciya
, get_date::date::timestamp
FROM temp_{table_name}
"""
)
with DAG( with DAG(
dag_id='data_download_from_1C_source', dag_id='data_download_from_1C_source',