sigma/dags/OSV_with_docs.py

452 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine, text
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
def read_data_1C(**kwargs):
"""
Создает DataFrame с ежедневным календарем и останавливается на дате T-2
Параметры:
start_year (int): начальный год (по умолчанию 2019)
end_year (int): конечный год (по умолчанию 2025)
Возвращает:
pandas.DataFrame: DataFrame с колонками Год, Месяц, День
"""
start_year = 2019
# Вычисляем дату T-2 (текущая дата минус 2 дня)
t_minus_2 = datetime.now() - timedelta(days=2)
t_minus_2 = t_minus_2.replace(hour=0, minute=0, second=0, microsecond=0)
print(f"Текущая дата: {datetime.now().strftime('%Y-%m-%d')}")
print(f"Дата T-2: {t_minus_2.strftime('%Y-%m-%d')}")
# Создаем диапазон дат от начала до T-2
start_date = f'{start_year}-01-01'
# date_range = pd.date_range(start=start_date, end=t_minus_2, freq='D')
# Создаем пустой список для хранения данных
calendar_data = []
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
engine = get_db_engine()
table_name = 'oborotno_salbdovaya_vedomostb_with_docs'
temp_table_name = f'temp_{table_name}'
# Проходим циклом по каждому дню
# print("\nПрохождение по дням календаря:")
# for i, date in enumerate(date_range):
# print(f'{date}')
query = f"""ВЫБРАТЬ
ОстаткиОбороты.Счет,
ОстаткиОбороты.Регистратор,
ОстаткиОбороты.Субконто1,
ОстаткиОбороты.Субконто2,
ОстаткиОбороты.Организация,
ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора,
ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора,
ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора,
ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента,
ОстаткиОбороты.Организация.Инн КАК ИннКлиента,
ОстаткиОбороты.СуммаОборот,
ОстаткиОбороты.СуммаОборотДт,
ОстаткиОбороты.СуммаОборотКт,
ОстаткиОбороты.СуммаКонечныйОстаток,
ОстаткиОбороты.СуммаКонечныйОстатокДт,
ОстаткиОбороты.СуммаКонечныйОстатокКт,
ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт,
ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт,
UUID(ОстаткиОбороты.Регистратор.ССылка) КАК ИдентификаторРегистратора,
UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора,
UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента,
UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента,
ОстаткиОбороты.Период КАК ДатаОтчета
ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( , ДАТАВРЕМЯ({t_minus_2.year},{t_minus_2.month},{t_minus_2.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты
"""
# РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты( ДАТАВРЕМЯ({date.year},{date.month},{date.day}), ДАТАВРЕМЯ({date.year},{date.month},{date.day}), Регистратор, , Счет.Код в (&СписокСчетов), , ) КАК ОстаткиОбороты
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
response = requests.post(
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
json={"query":query, "params": params},
auth=auth,
verify=False
)
data_from_1c = response.json()
df = pd.DataFrame(data_from_1c['data'])
field_mapping = {
# Основные поля
'Счет': 'schet',
'Регистратор': 'registrator',
'Субконто1': 'subkonto1',
'Субконто2': 'subkonto2',
'Организация': 'organizaciya',
'НомерДоговора': 'nomer',
'ДатаДоговора': 'date_begin',
'СрокДействияДоговора': 'date_end',
'ИннКонтрагента': 'inn_subkonto1',
'ИннКлиента': 'inn_organizaciya',
# Суммовые остатки и обороты
'СуммаОборот': 'summa_oborot',
'СуммаОборотДт': 'summa_oborot_dt',
'СуммаОборотКт': 'summa_oborot_kt',
'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok',
'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt',
'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt',
'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt',
'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt',
#ID
'ИдентификаторРегистратора': 'uid_registrator',
'ИдентификаторДоговора': 'uid_subkonto2',
'ИдентификаторКонтрагента': 'uid_subkonto1',
'ИдентификаторКлиента': 'uid_organizaciya',
#DATE
'ДатаОтчета': 'get_date'
}
df = df.rename(columns=field_mapping)
# # Удаляем дубликаты по ключевым полям перед вставкой
# group_columns = [
# 'schet', 'subkonto1', 'subkonto2', 'organizaciya', 'nomer',
# 'date_begin', 'date_end', 'inn_subkonto1', 'inn_organizaciya',
# 'uid_subkonto2', 'uid_subkonto1', 'uid_organizaciya', 'get_date'
# ]
# columns_for_sum = [
# 'summa_oborot', 'summa_oborot_dt', 'summa_oborot_kt',
# 'summa_konechnyy_ostatok', 'summa_konechnyy_ostatok_dt', 'summa_konechnyy_ostatok_kt',
# 'summa_konechnyy_razvernutyy_ostatok_dt', 'summa_konechnyy_razvernutyy_ostatok_kt'
# ]
# sum_columns = {
# 'summa_oborot': 'sum', 'summa_oborot_dt': 'sum', 'summa_oborot_kt': 'sum',
# 'summa_konechnyy_ostatok': 'sum', 'summa_konechnyy_ostatok_dt': 'sum', 'summa_konechnyy_ostatok_kt': 'sum',
# 'summa_konechnyy_razvernutyy_ostatok_dt': 'sum', 'summa_konechnyy_razvernutyy_ostatok_kt': 'sum'
# }
# for col in columns_for_sum:
# if col in df.columns:
# # Очищаем и преобразуем числовые значения
# df[col] = (df[col]
# .str.replace('\xa0', '', regex=False) # Убираем неразрывные пробелы
# .str.replace(' ', '', regex=False) # Убираем обычные пробелы
# .str.replace(',', '.', regex=False) # Запятые -> точки для десятичных
# .apply(lambda x: x if x.replace('.', '').replace('-', '').isdigit() else None)
# .apply(pd.to_numeric, errors='coerce'))
# # Диагностика
# original = df[col].copy()
# cleaned = df[col]
# failed = cleaned.isna() & original.notna()
# if failed.any():
# display(f" {col}: не преобразовано {failed.sum()} значений")
# display(f" Примеры: {original[failed].head(3).tolist()}")
# df = df.groupby(group_columns, as_index=False).agg(sum_columns)
# Преобразуем даты из формата DD.MM.YYYY HH:MM:SS в datetime
date_columns = ['date_begin', 'date_end', 'get_date']
for col in date_columns:
if col in df.columns:
# Пробуем разные форматы дат
df[col] = pd.to_datetime(df[col], format='%d.%m.%Y %H:%M:%S', errors='coerce')
# Если не сработало, пробуем без времени
if df[col].isna().any():
df[col] = pd.to_datetime(df[col], format='%d.%m.%Y', errors='coerce')
# Убедимся, что все даты преобразованы
for col in date_columns:
if col in df.columns:
failed_count = df[col].isna().sum()
if failed_count > 0:
print(f" Внимание: в колонке '{col}' осталось {failed_count} некорректных дат")
with engine.begin() as conn:
if not df.empty:
conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}")
conn.execute(f"CREATE TEMP TABLE {temp_table_name} (LIKE public.{table_name})")
df.to_sql(
f'temp_{table_name}',
con=conn,
if_exists='append',
index=False,
method='multi'
)
conn.execute(f"TRUNCATE TABLE public.{table_name}")
conn.execute(f"""
INSERT INTO public.{table_name}
SELECT
schet
, registrator
, subkonto1
, subkonto2
, organizaciya
, case when nomer = '' then null else nomer end nomer
, case when date_begin = '0001-01-01' then null else date_begin end date_begin
, case when date_begin = '0001-01-01' then null else date_end end date_end
, inn_subkonto1 as inn_subkonto1
, inn_organizaciya as inn_organizaciya
, summa_oborot
, summa_oborot_dt
, summa_oborot_kt
, summa_konechnyy_ostatok
, summa_konechnyy_ostatok_dt
, summa_konechnyy_ostatok_kt
, summa_konechnyy_razvernutyy_ostatok_dt
, summa_konechnyy_razvernutyy_ostatok_kt
, uid_registrator
, uid_subkonto2
, uid_subkonto1
, uid_organizaciya
, get_date::date::timestamp
FROM temp_{table_name}
"""
)
def upsert_list_fin_portfel(**kwargs):
engine = get_db_engine()
query = """
select distinct
osv.uid_subkonto2 uid_dogovor
, osv.schet as schet
, osv.subkonto2 as name
, null as summa_dogovora
, null as percent_value
from public.oborotno_salbdovaya_vedomostb osv
left join public.fin_porfel fp
on fp.schet = osv.schet
and fp.uid_dogovor = osv.uid_subkonto2
where (osv.schet like '%%01%%' or osv.schet like '%%03%%')
and fp.uid_dogovor is null
"""
df = pd.read_sql(query, engine)
with engine.begin() as conn:
if not df.empty:
conn.execute("CREATE TEMP TABLE temp_fin_porfel (uid_dogovor text null, schet text null, name text null, summa_dogovora text null, percent_value text null)")
df.to_sql('temp_fin_porfel', con=conn, if_exists='append', index=False, method='multi')
conn.execute("""
INSERT INTO public.fin_porfel (uid_dogovor, schet, name, summa_dogovora, percent_value)
SELECT DISTINCT
uid_dogovor
, schet
, name
, summa_dogovora
, percent_value
FROM temp_fin_porfel
ON CONFLICT (uid_dogovor, schet)
DO UPDATE SET
name = EXCLUDED.name,
summa_dogovora = EXCLUDED.summa_dogovora,
percent_value = EXCLUDED.percent_value
"""
)
conn.execute("""
UPDATE public.fin_porfel fp
SET id = subquery.new_id
FROM (
SELECT
id,
ROW_NUMBER() OVER (ORDER BY uid_dogovor, schet, summa_dogovora) as new_id
FROM public.fin_porfel
) AS subquery
WHERE fp.id = subquery.id;
"""
)
return 'Список обновлен.'
else:
return 'Обновлять нечего.'
def pogasheniya(**kwargs):
engine = get_db_engine()
query = text("""
SELECT DISTINCT
osv.schet
, osv.uid_subkonto2 as uid_dogovor
, osv.subkonto2 as name
, osv.nomer
, osv.date_begin
, osv.date_end
FROM oborotno_salbdovaya_vedomostb osv
LEFT JOIN pogasheniya p on p.uid_dogovor = osv.uid_subkonto2 and p.schet = osv.schet
WHERE p.uid_dogovor is null and (osv.schet::text like '%03%' OR osv.schet::text like '%01%')
UNION ALL
SELECT
'76.07.1' as schet
, '00000000-0000-0000-0000-000000000000' as uid_dogovor
, 'Лизинг' as name
, osv.agreement_num as nomer
, osv.agreement_date as date_begin
, osv.redemption_date as date_end
FROM lizingi_garantii osv
LEFT JOIN pogasheniya p on p.nomer = osv.agreement_num
WHERE p.uid_dogovor is null
""")
df = pd.read_sql(query, engine)
with engine.begin() as conn:
if not df.empty:
conn.execute("CREATE TEMP TABLE temp_pogasheniya (schet text null, uid_dogovor text null, name text null, nomer text null, date_begin text null, date_end text null)")
df.to_sql('temp_pogasheniya', con=conn, if_exists='append', index=False, method='multi')
conn.execute("""
INSERT INTO public.pogasheniya (schet, uid_dogovor, name, nomer, date_begin, date_end)
SELECT * FROM temp_pogasheniya
""")
# ON CONFLICT (schet, uid_dogovor)
# DO UPDATE SET
# name = EXCLUDED.name,
# nomer = EXCLUDED.nomer,
# date_begin = EXCLUDED.date_begin,
# date_end = EXCLUDED.date_end
conn.execute("""
UPDATE public.pogasheniya fp
SET id = subquery.new_id
FROM (
SELECT
id,
ROW_NUMBER() OVER (ORDER BY uid_dogovor, schet, nomer) as new_id
FROM public.pogasheniya
) AS subquery
WHERE fp.id = subquery.id;
"""
)
return 'Список обновлен.'
else:
return 'Обновлять нечего.'
def poruchitelstva(**kwargs):
engine = get_db_engine()
query = text("""
SELECT DISTINCT
osv.schet
, osv.uid_subkonto2 as uid_dogovor
, osv.subkonto2 as name
, osv.nomer
, osv.date_begin
, osv.date_end
FROM oborotno_salbdovaya_vedomostb osv
LEFT JOIN poruchitelstva p on p.uid_dogovor = osv.uid_subkonto2 and p.schet = osv.schet
LEFT JOIN sigma_gk sg_sub ON sg_sub.inn = osv.inn_subkonto1::text
LEFT JOIN sigma_gk sg_org ON sg_org.inn = osv.inn_organizaciya::text
WHERE p.uid_dogovor is null and (osv.schet::text like '%03%' OR osv.schet::text like '%01%') and (case when sg_sub.inn is not NULL AND sg_org.inn is not null then 1 else 0 end = 0)
UNION ALL
SELECT
case when osv.category_name = 'Лизинг' then '76.07.1' else '76.09' end as schet
, osv.id::text as uid_dogovor
, osv.category_name as name
, osv.agreement_num as nomer
, osv.agreement_date as date_begin
, osv.redemption_date as date_end
FROM lizingi_garantii osv
LEFT JOIN poruchitelstva p on p.nomer = osv.agreement_num
WHERE p.uid_dogovor is null
UNION ALL
SELECT
'009' as schet
, osv.id::text as uid_dogovor
, osv.category_name as name
, osv.agreement_num as nomer
, osv.agreement_date as date_begin
, osv.redemption_date as date_end
FROM zalogi osv
LEFT JOIN poruchitelstva p on p.nomer = osv.agreement_num
WHERE p.uid_dogovor is null
""")
df = pd.read_sql(query, engine)
with engine.begin() as conn:
if not df.empty:
conn.execute("CREATE TEMP TABLE temp_poruchitelstva (schet text null, uid_dogovor text null, name text null, nomer text null, date_begin text null, date_end text null)")
df.to_sql('temp_poruchitelstva', con=conn, if_exists='append', index=False, method='multi')
conn.execute("""
INSERT INTO public.poruchitelstva (schet, uid_dogovor, name, nomer, date_begin, date_end)
SELECT * FROM temp_poruchitelstva
""")
# ON CONFLICT (schet, uid_dogovor)
# DO UPDATE SET
# name = EXCLUDED.name,
# nomer = EXCLUDED.nomer,
# date_begin = EXCLUDED.date_begin,
# date_end = EXCLUDED.date_end
conn.execute("""
UPDATE public.poruchitelstva fp
SET id = subquery.new_id
FROM (
SELECT
id,
ROW_NUMBER() OVER (ORDER BY uid_dogovor, schet, nomer) as new_id
FROM public.poruchitelstva
) AS subquery
WHERE fp.id = subquery.id;
"""
)
return 'Список обновлен.'
else:
return 'Обновлять нечего.'
with DAG(
dag_id='data_download_from_1C_source_with_docs',
default_args=default_args,
description='Выгрузка данных из 1С',
schedule_interval=None, #"0,30 01-10 * * *",
catchup=False,
tags=['sigma'],
) as dag:
read_data_1C_task = PythonOperator(
task_id="read_data_1C",
python_callable=read_data_1C,
provide_context=True
)
upsert_list_fin_portfel_task = PythonOperator(
task_id='upsert_list_fin_portfel',
python_callable=upsert_list_fin_portfel,
provide_context=True
)
pogasheniya_task = PythonOperator(
task_id='pogasheniya',
python_callable=pogasheniya,
provide_context=True
)
poruchitelstva_task = PythonOperator(
task_id='poruchitelstva',
python_callable=poruchitelstva,
provide_context=True
)
read_data_1C_task >> [upsert_list_fin_portfel_task, pogasheniya_task, poruchitelstva_task]