sigma/dags/split_subkonto2.py

329 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
RUS_MONTHS = {
"января": 1, "февраля": 2, "марта": 3, "апреля": 4,
"мая": 5, "июня": 6, "июля": 7, "августа": 8,
"сентября": 9, "октября": 10, "ноября": 11, "декабря": 12
}
# Улучшенные шаблоны для поиска дат
DATE_PATTERNS = [
# dd.mm.yyyy с возможными лишними точками и "г", "г."
re.compile(r'\b(\d{1,2}[.\-/]\d{1,2}(?:[.\-/]\d{2,4})?)\s*(?:г\.?|года)?\b', re.I),
# текстовые месяцы: 27 января 2014, 13 июля 2017г
re.compile(r'\b(\d{1,2}\s+(января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s+\d{4})\s*(?:г\.?|года)?\b', re.I),
# даты после "от" без пробелов: от27.05.2024, от09 октября 2017г
re.compile(r'(?:от|с|по)\s*(\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4})', re.I),
re.compile(r'(?:от|с|по)\s*(\d{1,2}\s*(?:января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s*\d{4})', re.I)
]
# Улучшенные шаблоны для номера договора (приоритетный порядок)
NUMBER_PATTERNS = [
# Сложные номера с префиксами типа КС-ЦН-728710/2019/00107, СБО/020324-296925
re.compile(r'\b([A-Za-zА-Яа-яЁё\-]{2,}[\/\-][A-Za-zА-Яа-яЁё0-9\-\._\/]+)\b'),
# Номера после знака № (без самого знака)
re.compile(r'\s*([A-Za-zА-Яа-яЁё0-9\-\._\/]+)', re.I),
# Номера типа Н-10/2012, КС728710/2013/00053
re.compile(r'\b([A-Za-zА-Яа-яЁё]{2,}\d+[A-Za-zА-Яа-яЁё0-9\-\._\/]*)\b'),
# Номера с дефисами и слэшами
re.compile(r'\b([A-Za-zА-Яа-яЁё0-9\-\._\/]{5,})\b'),
# Простые числовые номера
re.compile(r'\b(\d{3,}[A-Za-zА-Яа-яЁё]*)\b')
]
# Ключевые слова для названий (только полные названия)
TITLE_KEYWORDS = [
"договор займа", "кредитный договор", "кредитное соглашение",
"соглашение", "кр.договор", "кр договор", "кредитного договора",
"договор ofgol", "кр.согл", "кр согл", "договор займа по процентам"
]
def normalize_spaces(s):
return re.sub(r'\s+', ' ', s).strip()
def find_date(s):
# Ищем все возможные форматы дат
for pat in DATE_PATTERNS:
m = pat.search(s)
if m:
raw = m.group(1)
start, end = m.span(1)
norm = normalize_date(raw)
if norm != raw: # Если дата нормализовалась успешно
return raw, norm, (start, end)
return None, None, None
def normalize_date(raw):
try:
# Убираем лишние символы
r = raw.lower().replace('г.', '').replace('г', '').replace('года', '').strip()
r = re.sub(r'\.+', '.', r)
r = re.sub(r'\s+', ' ', r)
# Обработка текстовых месяцев
for month_name in RUS_MONTHS.keys():
if month_name in r:
parts = re.split(r'\s+', r)
if len(parts) >= 3:
day = int(parts[0])
year = int(parts[2])
mon = RUS_MONTHS[month_name]
return f"{year:04d}-{mon:02d}-{day:02d}"
# Обработка числовых форматов
if re.match(r'\d', r):
# Заменяем все разделители на точки для единообразия
r_clean = re.sub(r'[\/\-]', '.', r)
parts = [p for p in r_clean.split('.') if p != '']
if len(parts) >= 3:
try:
# Определяем порядок: день, месяц, год
if len(parts[0]) <= 2 and len(parts[1]) <= 2:
day = int(parts[0])
month = int(parts[1])
year = int(parts[2])
else:
# Если первые части длинные, возможно это год-месяц-день
if len(parts[0]) == 4 and len(parts[1]) <= 2 and len(parts[2]) <= 2:
year = int(parts[0])
month = int(parts[1])
day = int(parts[2])
else:
return raw
if year < 100:
year += 2000
# Проверяем валидность даты
if 1 <= month <= 12 and 1 <= day <= 31:
return f"{year:04d}-{month:02d}-{day:02d}"
except:
pass
return raw
except:
return raw
def find_number(s, date_span=None):
masked = s
if date_span:
start, end = date_span
masked = s[:start] + ' ' * (end - start) + s[end:]
for pat in NUMBER_PATTERNS:
matches = list(pat.finditer(masked))
for m in matches:
candidate = m.group(1).strip(' ,.;:')
# Пропускаем очевидные не-номера
if re.fullmatch(r'(от|по|до|г|гд|договор|соглашение)', candidate, re.I):
continue
# Пропускаем чистые слова без цифр (кроме коротких префиксов)
if (re.search(r'[A-Za-zА-Яа-яЁё]', candidate) and not re.search(r'\d', candidate) and
len(candidate) > 3):
continue
# Пропускаем даты
if re.search(r'\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4}', candidate):
continue
return candidate, m.span(1)
return None, None
def extract_title(s, number_span=None, date_span=None):
s0 = s.strip()
low = s0.lower()
# Определяем позицию обрезки
cut_pos = len(s0)
if number_span:
cut_pos = min(cut_pos, number_span[0])
if date_span:
cut_pos = min(cut_pos, date_span[0])
# Ищем слово "от" перед датой
m_ot = re.search(r'\bот\b', low)
if m_ot and m_ot.start() < cut_pos:
cut_pos = m_ot.start()
# Ищем ключевые слова
best_match = None
best_pos = len(s0)
for kw in TITLE_KEYWORDS:
if kw in low:
pos = low.find(kw)
if pos < best_pos:
best_pos = pos
best_match = kw
if best_match is not None:
# Берем только ключевое слово
return best_match.upper()
# Если нет ключевых слов, проверяем есть ли что-то перед номером/датой
title_candidate = s0[:cut_pos].strip(' ,;:.')
if title_candidate:
# Если в кандидате есть цифры или сложные символы - вероятно это часть номера
if (re.search(r'\d', title_candidate) or
re.search(r'[\/\-_]', title_candidate) or
len(title_candidate) <= 3):
return None
return title_candidate
return None
def parse_contract_cell(cell_text):
if pd.isna(cell_text) or not str(cell_text).strip():
return pd.Series({
"subkonto2": None,
"naimenovanie": None,
"nomer": None,
"date_begin": None,
"date_end": None
})
s = str(cell_text).strip()
s_norm_spaces = normalize_spaces(s)
# Поиск даты
date_raw, date_norm, date_span = find_date(s_norm_spaces)
# Поиск номера
number, number_span = find_number(s_norm_spaces, date_span)
# Название договора
title = extract_title(s_norm_spaces, number_span, date_span)
# Убираем знак № из названия если он там есть
if title and '' in title:
title = re.sub(r'\s*', '', title).strip()
return pd.Series({
"subkonto2": cell_text,
"naimenovanie": title if title == None else title.lower(),
"nomer": number,
"date_begin": date_norm,
"date_end": None
})
def read_dict_subkonto2_db(**kwargs):
engine = get_db_engine()
df = pd.read_sql("""
select
osv.subkonto2 as subkonto2
, dst.naimenovanie as naimenovanie
, osv.nomer as nomer
, osv.date_begin as date_begin
, osv.date_end as date_end
from public.oborotno_salbdovaya_vedomostb osv
left join public.dict_subkonto_two as dst
on dst.subkonto2 = osv.subkonto2
where osv.subkonto2 not in (select dst2.subkonto2 from dict_subkonto_two dst2)
and osv.nomer is null
""", engine)
return df.to_dict(orient='records')
def split_subkonto_from_1C(**kwargs):
engine = get_db_engine()
df = pd.read_sql("""
select distinct
osv.subkonto2 as subkonto2
from public.oborotno_salbdovaya_vedomostb osv
where osv.subkonto2 not in (select dst2.subkonto2 from dict_subkonto_two dst2)
""", engine)
result_df = df['subkonto2'].apply(parse_contract_cell)
return result_df.to_dict(orient='records')
def merge_dict_and_split_1C(**kwargs):
ti = kwargs['ti']
dict_subkonto = ti.xcom_pull(task_ids='read_dict_subkonto2_db')
split_subkonto = ti.xcom_pull(task_ids='split_subkonto_from_1C')
df_dict_subkonto = pd.DataFrame.from_records(dict_subkonto) if dict_subkonto else pd.DataFrame()
df_split_subkonto = pd.DataFrame.from_records(split_subkonto) if split_subkonto else pd.DataFrame()
result_df = df_split_subkonto.merge(df_dict_subkonto, how='left', on='subkonto2', suffixes=('_split', '_dict'))
result_df['naimenovanie'] = result_df['naimenovanie_split']
result_df['nomer'] = np.where(
(result_df['nomer_dict'].notna()) | (result_df['nomer_split'] != result_df['nomer_dict']),
result_df['nomer_dict'],
result_df['nomer_split']
)
result_df['date_begin'] = np.where(
result_df['date_begin_dict'].isna(),
result_df['date_begin_split'],
result_df['date_begin_dict']
)
result_df['date_end'] = result_df['date_end_dict']
result_df = result_df[['subkonto2', 'naimenovanie', 'nomer', 'date_begin', 'date_end']]
return result_df.to_dict(orient='records')
with DAG(
dag_id='split_subkonto2',
default_args=default_args,
description='Разделение Субконто2 на наименование, номер, дату начала и дату окончания договора. C последующим мэппингом справочника.',
schedule_interval=None,
catchup=False,
tags=['sigma'],
) as dag:
read_dict_subkonto2_db_task = PythonOperator(
task_id='read_dict_subkonto2_db',
python_callable=read_dict_subkonto2_db,
provide_context=True
)
split_subkonto_from_1C_task = PythonOperator(
task_id='split_subkonto_from_1C',
python_callable=split_subkonto_from_1C,
provide_context=True
)
merge_dict_and_split_1C_task = PythonOperator(
task_id='merge_dict_and_split_1C',
python_callable=merge_dict_and_split_1C,
provide_context=True
)
[read_dict_subkonto2_db_task, split_subkonto_from_1C_task] >> merge_dict_and_split_1C_task