Обновить dags/split_subkonto2.py

This commit is contained in:
bn_user 2025-09-08 08:16:30 +00:00
parent 68f244a5fb
commit 2090c7e025
1 changed files with 35 additions and 19 deletions

View File

@ -209,10 +209,16 @@ def extract_title(s, number_span=None, date_span=None):
return None return None
def parse_contract_cell(cell_text): def parse_contract_cell(cell_text):
s = cell_text.strip() if pd.isna(cell_text) or not str(cell_text).strip():
if not s: return pd.Series({
return {"title": None, "number": None, "date_raw": None, "date_norm": None} "subkonto2": None,
"naimenovanie": None,
"nomer": None,
"date_bedin": None,
"date_end": None
})
s = str(cell_text).strip()
s_norm_spaces = normalize_spaces(s) s_norm_spaces = normalize_spaces(s)
# Поиск даты # Поиск даты
@ -228,25 +234,27 @@ def parse_contract_cell(cell_text):
if title and '' in title: if title and '' in title:
title = re.sub(r'\s*', '', title).strip() title = re.sub(r'\s*', '', title).strip()
return { return pd.Series({
"title": title, "subkonto2": cell_text,
"number": number, "naimenovanie": title,
"date_raw": date_raw, "nomer": number,
"date_norm": date_norm "date_bedin": date_norm,
} "date_end": None
})
def read_dict_subkonto2_db(**kwargs): def read_dict_subkonto2_db(**kwargs):
df = pd.read_sql(""" df = pd.read_sql("""
select select
osv.subkonto2 as subkonto2 osv.subkonto2 as subkonto2
, dst.naimenovanie as naimenovanie , dst.naimenovanie as naimenovanie
, coalesce(dst.nomer , osv.nomer) as nomer , coalesce(dst.nomer , osv.nomer) as nomer
, coalesce(dst.date_begin , osv.date_bedin) as date_bedin , coalesce(dst.date_begin , osv.date_bedin) as date_bedin
, coalesce(dst.date_end, osv.date_end) as date_end , coalesce(dst.date_end, osv.date_end) as date_end
from public.oborotno_salbdovaya_vedomostb osv from public.oborotno_salbdovaya_vedomostb osv
left join public.dict_subkonto_two as dst left join public.dict_subkonto_two as dst
on dst.subkonto2 = osv.subkonto2 on dst.subkonto2 = osv.subkonto2
where osv.subkonto2 not in (select dst2.subkonto2 from dict_subkonto_two dst2)""") where osv.subkonto2 not in (select dst2.subkonto2 from dict_subkonto_two dst2)
""")
return df.to_dict(orient='records') return df.to_dict(orient='records')
def split_subkonto_from_1C(**kwargs): def split_subkonto_from_1C(**kwargs):
@ -254,19 +262,27 @@ def split_subkonto_from_1C(**kwargs):
select distinct select distinct
osv.subkonto2 as subkonto2 osv.subkonto2 as subkonto2
from public.oborotno_salbdovaya_vedomostb osv from public.oborotno_salbdovaya_vedomostb osv
where osv.subkonto2 not in (select dst2.subkonto2 from dict_subkonto_two dst2)
""") """)
result_df = df['column_name'].apply(parse_contract_cell)
return df.to_dict(orient='records') return result_df.to_dict(orient='records')
def merge_dict_and_split_1C(**kwargs): def merge_dict_and_split_1C(**kwargs):
ti = kwargs['ti'] ti = kwargs['ti']
dict_subkonto = ti.xcom_pull(task_ids='read_dict_subkonto2_db')
split_subkonto = ti.xcom_pull(task_ids='split_subkonto_from_1C')
df_dict_subkonto = pd.DataFrame.from_records(dict_subkonto) if dict_subkonto else pd.DataFrame()
df_split_subkonto = pd.DataFrame.from_records(split_subkonto) if split_subkonto else pd.DataFrame()
df_subkonto = df_split_subkonto.merge(df_dict_subkonto, how='left', on='subkonto2')
return df_subkonto.to_dict(orient='records')
with DAG( with DAG(
dag_id='split_subkonto2', dag_id='split_subkonto2',
default_args=default_args, default_args=default_args,
description='Разделение Субконто2 на наименование, номер, дату начала и дату окончания договора', description='Разделение Субконто2 на наименование, номер, дату начала и дату окончания договора. C последующим мэппингом справочника.',
schedule_interval=None, schedule_interval=None,
catchup=False, catchup=False,
tags=['sigma'], tags=['sigma'],