330 lines
13 KiB
Python
330 lines
13 KiB
Python
import re
|
||
import requests
|
||
import json
|
||
import pandas as pd
|
||
import numpy as np
|
||
from datetime import datetime
|
||
from requests.auth import HTTPBasicAuth
|
||
from sqlalchemy import create_engine
|
||
from airflow import DAG
|
||
from airflow.operators.python import PythonOperator
|
||
|
||
default_args = {
|
||
'owner': 'airflow',
|
||
'depends_on_past': False,
|
||
'start_date': datetime(2023, 1, 1),
|
||
'retries': 1,
|
||
}
|
||
|
||
def get_db_engine():
|
||
"""Создает подключение к PostgreSQL"""
|
||
DF_CONFIG = {
|
||
'dbname': "postgres",
|
||
'user': "postgres",
|
||
'password': "4a00d4b90cd830da0796",
|
||
'host': "postgresql",
|
||
'port': "5432"
|
||
}
|
||
return create_engine(
|
||
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
|
||
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
|
||
pool_size=10,
|
||
max_overflow=20
|
||
)
|
||
|
||
RUS_MONTHS = {
|
||
"января": 1, "февраля": 2, "марта": 3, "апреля": 4,
|
||
"мая": 5, "июня": 6, "июля": 7, "августа": 8,
|
||
"сентября": 9, "октября": 10, "ноября": 11, "декабря": 12
|
||
}
|
||
|
||
# Улучшенные шаблоны для поиска дат
|
||
DATE_PATTERNS = [
|
||
# dd.mm.yyyy с возможными лишними точками и "г", "г."
|
||
re.compile(r'\b(\d{1,2}[.\-/]\d{1,2}(?:[.\-/]\d{2,4})?)\s*(?:г\.?|года)?\b', re.I),
|
||
# текстовые месяцы: 27 января 2014, 13 июля 2017г
|
||
re.compile(r'\b(\d{1,2}\s+(января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s+\d{4})\s*(?:г\.?|года)?\b', re.I),
|
||
# даты после "от" без пробелов: от27.05.2024, от09 октября 2017г
|
||
re.compile(r'(?:от|с|по)\s*(\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4})', re.I),
|
||
re.compile(r'(?:от|с|по)\s*(\d{1,2}\s*(?:января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s*\d{4})', re.I)
|
||
]
|
||
|
||
# Улучшенные шаблоны для номера договора (приоритетный порядок)
|
||
NUMBER_PATTERNS = [
|
||
# Сложные номера с префиксами типа КС-ЦН-728710/2019/00107, СБО/020324-296925
|
||
re.compile(r'\b([A-Za-zА-Яа-яЁё\-]{2,}[\/\-][A-Za-zА-Яа-яЁё0-9\-\._\/]+)\b'),
|
||
# Номера после знака № (без самого знака)
|
||
re.compile(r'№\s*([A-Za-zА-Яа-яЁё0-9\-\._\/]+)', re.I),
|
||
# Номера типа Н-10/2012, КС728710/2013/00053
|
||
re.compile(r'\b([A-Za-zА-Яа-яЁё]{2,}\d+[A-Za-zА-Яа-яЁё0-9\-\._\/]*)\b'),
|
||
# Номера с дефисами и слэшами
|
||
re.compile(r'\b([A-Za-zА-Яа-яЁё0-9\-\._\/]{5,})\b'),
|
||
# Простые числовые номера
|
||
re.compile(r'\b(\d{3,}[A-Za-zА-Яа-яЁё]*)\b')
|
||
]
|
||
|
||
# Ключевые слова для названий (только полные названия)
|
||
TITLE_KEYWORDS = [
|
||
"договор займа", "кредитный договор", "кредитное соглашение",
|
||
"соглашение", "кр.договор", "кр договор", "кредитного договора",
|
||
"договор ofgol", "кр.согл", "кр согл", "договор займа по процентам"
|
||
]
|
||
|
||
def normalize_spaces(s):
|
||
return re.sub(r'\s+', ' ', s).strip()
|
||
|
||
def find_date(s):
|
||
# Ищем все возможные форматы дат
|
||
for pat in DATE_PATTERNS:
|
||
m = pat.search(s)
|
||
if m:
|
||
raw = m.group(1)
|
||
start, end = m.span(1)
|
||
norm = normalize_date(raw)
|
||
if norm != raw: # Если дата нормализовалась успешно
|
||
return raw, norm, (start, end)
|
||
|
||
return None, None, None
|
||
|
||
def normalize_date(raw):
|
||
try:
|
||
# Убираем лишние символы
|
||
r = raw.lower().replace('г.', '').replace('г', '').replace('года', '').strip()
|
||
r = re.sub(r'\.+', '.', r)
|
||
r = re.sub(r'\s+', ' ', r)
|
||
|
||
# Обработка текстовых месяцев
|
||
for month_name in RUS_MONTHS.keys():
|
||
if month_name in r:
|
||
parts = re.split(r'\s+', r)
|
||
if len(parts) >= 3:
|
||
day = int(parts[0])
|
||
year = int(parts[2])
|
||
mon = RUS_MONTHS[month_name]
|
||
return f"{year:04d}-{mon:02d}-{day:02d}"
|
||
|
||
# Обработка числовых форматов
|
||
if re.match(r'\d', r):
|
||
# Заменяем все разделители на точки для единообразия
|
||
r_clean = re.sub(r'[\/\-]', '.', r)
|
||
parts = [p for p in r_clean.split('.') if p != '']
|
||
|
||
if len(parts) >= 3:
|
||
try:
|
||
# Определяем порядок: день, месяц, год
|
||
if len(parts[0]) <= 2 and len(parts[1]) <= 2:
|
||
day = int(parts[0])
|
||
month = int(parts[1])
|
||
year = int(parts[2])
|
||
else:
|
||
# Если первые части длинные, возможно это год-месяц-день
|
||
if len(parts[0]) == 4 and len(parts[1]) <= 2 and len(parts[2]) <= 2:
|
||
year = int(parts[0])
|
||
month = int(parts[1])
|
||
day = int(parts[2])
|
||
else:
|
||
return raw
|
||
|
||
if year < 100:
|
||
year += 2000
|
||
# Проверяем валидность даты
|
||
if 1 <= month <= 12 and 1 <= day <= 31:
|
||
return f"{year:04d}-{month:02d}-{day:02d}"
|
||
except:
|
||
pass
|
||
|
||
return raw
|
||
except:
|
||
return raw
|
||
|
||
def find_number(s, date_span=None):
|
||
masked = s
|
||
if date_span:
|
||
start, end = date_span
|
||
masked = s[:start] + ' ' * (end - start) + s[end:]
|
||
|
||
for pat in NUMBER_PATTERNS:
|
||
matches = list(pat.finditer(masked))
|
||
for m in matches:
|
||
candidate = m.group(1).strip(' ,.;:')
|
||
|
||
# Пропускаем очевидные не-номера
|
||
if re.fullmatch(r'(от|по|до|г|гд|договор|соглашение)', candidate, re.I):
|
||
continue
|
||
|
||
# Пропускаем чистые слова без цифр (кроме коротких префиксов)
|
||
if (re.search(r'[A-Za-zА-Яа-яЁё]', candidate) and not re.search(r'\d', candidate) and
|
||
len(candidate) > 3):
|
||
continue
|
||
|
||
# Пропускаем даты
|
||
if re.search(r'\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4}', candidate):
|
||
continue
|
||
|
||
return candidate, m.span(1)
|
||
|
||
return None, None
|
||
|
||
def extract_title(s, number_span=None, date_span=None):
|
||
s0 = s.strip()
|
||
low = s0.lower()
|
||
|
||
# Определяем позицию обрезки
|
||
cut_pos = len(s0)
|
||
if number_span:
|
||
cut_pos = min(cut_pos, number_span[0])
|
||
if date_span:
|
||
cut_pos = min(cut_pos, date_span[0])
|
||
|
||
# Ищем слово "от" перед датой
|
||
m_ot = re.search(r'\bот\b', low)
|
||
if m_ot and m_ot.start() < cut_pos:
|
||
cut_pos = m_ot.start()
|
||
|
||
# Ищем ключевые слова
|
||
best_match = None
|
||
best_pos = len(s0)
|
||
|
||
for kw in TITLE_KEYWORDS:
|
||
if kw in low:
|
||
pos = low.find(kw)
|
||
if pos < best_pos:
|
||
best_pos = pos
|
||
best_match = kw
|
||
|
||
if best_match is not None:
|
||
# Берем только ключевое слово
|
||
return best_match.upper()
|
||
|
||
# Если нет ключевых слов, проверяем есть ли что-то перед номером/датой
|
||
title_candidate = s0[:cut_pos].strip(' ,;:.')
|
||
if title_candidate:
|
||
# Если в кандидате есть цифры или сложные символы - вероятно это часть номера
|
||
if (re.search(r'\d', title_candidate) or
|
||
re.search(r'[\/\-_]', title_candidate) or
|
||
len(title_candidate) <= 3):
|
||
return None
|
||
return title_candidate
|
||
|
||
return None
|
||
|
||
def parse_contract_cell(cell_text):
|
||
if pd.isna(cell_text) or not str(cell_text).strip():
|
||
return pd.Series({
|
||
"subkonto2": None,
|
||
"naimenovanie": None,
|
||
"nomer": None,
|
||
"date_begin": None,
|
||
"date_end": None
|
||
})
|
||
|
||
s = str(cell_text).strip()
|
||
s_norm_spaces = normalize_spaces(s)
|
||
|
||
# Поиск даты
|
||
date_raw, date_norm, date_span = find_date(s_norm_spaces)
|
||
|
||
# Поиск номера
|
||
number, number_span = find_number(s_norm_spaces, date_span)
|
||
|
||
# Название договора
|
||
title = extract_title(s_norm_spaces, number_span, date_span)
|
||
|
||
# Убираем знак № из названия если он там есть
|
||
if title and '№' in title:
|
||
title = re.sub(r'№\s*', '', title).strip()
|
||
|
||
return pd.Series({
|
||
"subkonto2": cell_text,
|
||
"naimenovanie": title if title == None else title.lower(),
|
||
"nomer": number,
|
||
"date_begin": date_norm,
|
||
"date_end": None
|
||
})
|
||
|
||
def read_dict_subkonto2_db(**kwargs):
|
||
engine = get_db_engine()
|
||
df = pd.read_sql("""
|
||
select
|
||
osv.subkonto2 as subkonto2
|
||
, dst.naimenovanie as naimenovanie
|
||
, osv.nomer as nomer
|
||
, osv.date_begin as date_begin
|
||
, osv.date_end as date_end
|
||
from public.oborotno_salbdovaya_vedomostb osv
|
||
left join public.dict_subkonto_two as dst
|
||
on dst.subkonto2 = osv.subkonto2
|
||
where osv.subkonto2 not in (select dst2.subkonto2 from dict_subkonto_two dst2)
|
||
and osv.nomer is null
|
||
""", engine)
|
||
return df.to_dict(orient='records')
|
||
|
||
def split_subkonto_from_1C(**kwargs):
|
||
engine = get_db_engine()
|
||
df = pd.read_sql("""
|
||
select distinct
|
||
osv.subkonto2 as subkonto2
|
||
from public.oborotno_salbdovaya_vedomostb osv
|
||
where osv.subkonto2 not in (select dst2.subkonto2 from dict_subkonto_two dst2)
|
||
""", engine)
|
||
result_df = df['subkonto2'].apply(parse_contract_cell)
|
||
return result_df.to_dict(orient='records')
|
||
|
||
def merge_dict_and_split_1C(**kwargs):
|
||
|
||
ti = kwargs['ti']
|
||
dict_subkonto = ti.xcom_pull(task_ids='read_dict_subkonto2_db')
|
||
split_subkonto = ti.xcom_pull(task_ids='split_subkonto_from_1C')
|
||
|
||
df_dict_subkonto = pd.DataFrame.from_records(dict_subkonto) if dict_subkonto else pd.DataFrame()
|
||
df_split_subkonto = pd.DataFrame.from_records(split_subkonto) if split_subkonto else pd.DataFrame()
|
||
|
||
result_df = df_split_subkonto.merge(df_dict_subkonto, how='left', on='subkonto2', suffixes=('_split', '_dict'))
|
||
return result_df[result_df['subkonto2']=='договор займа №02/11/17']
|
||
result_df['naimenovanie'] = result_df['naimenovanie_split']
|
||
|
||
result_df['nomer'] = np.where(
|
||
(result_df['nomer_dict'].notna()) & (result_df['nomer_split'] != result_df['nomer_dict']),
|
||
result_df['nomer_dict'],
|
||
result_df['nomer_split']
|
||
)
|
||
|
||
result_df['date_begin'] = np.where(
|
||
result_df['date_begin_dict'].isna(),
|
||
result_df['date_begin_split'],
|
||
result_df['date_begin_dict']
|
||
)
|
||
|
||
result_df['date_end'] = result_df['date_end_dict']
|
||
|
||
result_df = result_df[['subkonto2', 'naimenovanie', 'nomer', 'date_begin', 'date_end']]
|
||
return result_df.to_dict(orient='records')
|
||
|
||
with DAG(
|
||
dag_id='split_subkonto2',
|
||
default_args=default_args,
|
||
description='Разделение Субконто2 на наименование, номер, дату начала и дату окончания договора. C последующим мэппингом справочника.',
|
||
schedule_interval=None,
|
||
catchup=False,
|
||
tags=['sigma'],
|
||
) as dag:
|
||
|
||
read_dict_subkonto2_db_task = PythonOperator(
|
||
task_id='read_dict_subkonto2_db',
|
||
python_callable=read_dict_subkonto2_db,
|
||
provide_context=True
|
||
)
|
||
|
||
split_subkonto_from_1C_task = PythonOperator(
|
||
task_id='split_subkonto_from_1C',
|
||
python_callable=split_subkonto_from_1C,
|
||
provide_context=True
|
||
)
|
||
|
||
merge_dict_and_split_1C_task = PythonOperator(
|
||
task_id='merge_dict_and_split_1C',
|
||
python_callable=merge_dict_and_split_1C,
|
||
provide_context=True
|
||
)
|
||
|
||
[read_dict_subkonto2_db_task, split_subkonto_from_1C_task] >> merge_dict_and_split_1C_task |