diff --git a/dags/split_subkonto2.py b/dags/split_subkonto2.py index b83199a..86ff48b 100644 --- a/dags/split_subkonto2.py +++ b/dags/split_subkonto2.py @@ -209,10 +209,16 @@ def extract_title(s, number_span=None, date_span=None): return None def parse_contract_cell(cell_text): - s = cell_text.strip() - if not s: - return {"title": None, "number": None, "date_raw": None, "date_norm": None} + if pd.isna(cell_text) or not str(cell_text).strip(): + return pd.Series({ + "subkonto2": None, + "naimenovanie": None, + "nomer": None, + "date_bedin": None, + "date_end": None + }) + s = str(cell_text).strip() s_norm_spaces = normalize_spaces(s) # Поиск даты @@ -228,25 +234,27 @@ def parse_contract_cell(cell_text): if title and '№' in title: title = re.sub(r'№\s*', '', title).strip() - return { - "title": title, - "number": number, - "date_raw": date_raw, - "date_norm": date_norm - } + return pd.Series({ + "subkonto2": cell_text, + "naimenovanie": title, + "nomer": number, + "date_bedin": date_norm, + "date_end": None + }) def read_dict_subkonto2_db(**kwargs): df = pd.read_sql(""" select osv.subkonto2 as subkonto2 - , dst.naimenovanie as naimenovanie - , coalesce(dst.nomer , osv.nomer) as nomer - , coalesce(dst.date_begin , osv.date_bedin) as date_bedin - , coalesce(dst.date_end, osv.date_end) as date_end + , dst.naimenovanie as naimenovanie + , coalesce(dst.nomer , osv.nomer) as nomer + , coalesce(dst.date_begin , osv.date_bedin) as date_bedin + , coalesce(dst.date_end, osv.date_end) as date_end from public.oborotno_salbdovaya_vedomostb osv - left join public.dict_subkonto_two as dst + left join public.dict_subkonto_two as dst on dst.subkonto2 = osv.subkonto2 - where osv.subkonto2 not in (select dst2.subkonto2 from dict_subkonto_two dst2)""") + where osv.subkonto2 not in (select dst2.subkonto2 from dict_subkonto_two dst2) + """) return df.to_dict(orient='records') def split_subkonto_from_1C(**kwargs): @@ -254,19 +262,27 @@ def split_subkonto_from_1C(**kwargs): select distinct osv.subkonto2 as subkonto2 from public.oborotno_salbdovaya_vedomostb osv + where osv.subkonto2 not in (select dst2.subkonto2 from dict_subkonto_two dst2) """) - - return df.to_dict(orient='records') + result_df = df['column_name'].apply(parse_contract_cell) + return result_df.to_dict(orient='records') def merge_dict_and_split_1C(**kwargs): ti = kwargs['ti'] - + dict_subkonto = ti.xcom_pull(task_ids='read_dict_subkonto2_db') + split_subkonto = ti.xcom_pull(task_ids='split_subkonto_from_1C') + + df_dict_subkonto = pd.DataFrame.from_records(dict_subkonto) if dict_subkonto else pd.DataFrame() + df_split_subkonto = pd.DataFrame.from_records(split_subkonto) if split_subkonto else pd.DataFrame() + + df_subkonto = df_split_subkonto.merge(df_dict_subkonto, how='left', on='subkonto2') + return df_subkonto.to_dict(orient='records') with DAG( dag_id='split_subkonto2', default_args=default_args, - description='Разделение Субконто2 на наименование, номер, дату начала и дату окончания договора', + description='Разделение Субконто2 на наименование, номер, дату начала и дату окончания договора. C последующим мэппингом справочника.', schedule_interval=None, catchup=False, tags=['sigma'],