Добавить dags/split_subkonto2.py

This commit is contained in:
bn_user 2025-09-08 06:22:39 +00:00
parent 2088f771f2
commit 68f244a5fb
1 changed files with 293 additions and 0 deletions

293
dags/split_subkonto2.py Normal file
View File

@ -0,0 +1,293 @@
import re
import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
RUS_MONTHS = {
"января": 1, "февраля": 2, "марта": 3, "апреля": 4,
"мая": 5, "июня": 6, "июля": 7, "августа": 8,
"сентября": 9, "октября": 10, "ноября": 11, "декабря": 12
}
# Улучшенные шаблоны для поиска дат
DATE_PATTERNS = [
# dd.mm.yyyy с возможными лишними точками и "г", "г."
re.compile(r'\b(\d{1,2}[.\-/]\d{1,2}(?:[.\-/]\d{2,4})?)\s*(?:г\.?|года)?\b', re.I),
# текстовые месяцы: 27 января 2014, 13 июля 2017г
re.compile(r'\b(\d{1,2}\s+(января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s+\d{4})\s*(?:г\.?|года)?\b', re.I),
# даты после "от" без пробелов: от27.05.2024, от09 октября 2017г
re.compile(r'(?:от|с|по)\s*(\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4})', re.I),
re.compile(r'(?:от|с|по)\s*(\d{1,2}\s*(?:января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s*\d{4})', re.I)
]
# Улучшенные шаблоны для номера договора (приоритетный порядок)
NUMBER_PATTERNS = [
# Сложные номера с префиксами типа КС-ЦН-728710/2019/00107, СБО/020324-296925
re.compile(r'\b([A-Za-zА-Яа-яЁё\-]{2,}[\/\-][A-Za-zА-Яа-яЁё0-9\-\._\/]+)\b'),
# Номера после знака № (без самого знака)
re.compile(r'\s*([A-Za-zА-Яа-яЁё0-9\-\._\/]+)', re.I),
# Номера типа Н-10/2012, КС728710/2013/00053
re.compile(r'\b([A-Za-zА-Яа-яЁё]{2,}\d+[A-Za-zА-Яа-яЁё0-9\-\._\/]*)\b'),
# Номера с дефисами и слэшами
re.compile(r'\b([A-Za-zА-Яа-яЁё0-9\-\._\/]{5,})\b'),
# Простые числовые номера
re.compile(r'\b(\d{3,}[A-Za-zА-Яа-яЁё]*)\b')
]
# Ключевые слова для названий (только полные названия)
TITLE_KEYWORDS = [
"договор займа", "кредитный договор", "кредитное соглашение",
"соглашение", "кр.договор", "кр договор", "кредитного договора",
"договор ofgol", "кр.согл", "кр согл", "договор займа по процентам"
]
def normalize_spaces(s):
return re.sub(r'\s+', ' ', s).strip()
def find_date(s):
# Ищем все возможные форматы дат
for pat in DATE_PATTERNS:
m = pat.search(s)
if m:
raw = m.group(1)
start, end = m.span(1)
norm = normalize_date(raw)
if norm != raw: # Если дата нормализовалась успешно
return raw, norm, (start, end)
return None, None, None
def normalize_date(raw):
try:
# Убираем лишние символы
r = raw.lower().replace('г.', '').replace('г', '').replace('года', '').strip()
r = re.sub(r'\.+', '.', r)
r = re.sub(r'\s+', ' ', r)
# Обработка текстовых месяцев
for month_name in RUS_MONTHS.keys():
if month_name in r:
parts = re.split(r'\s+', r)
if len(parts) >= 3:
day = int(parts[0])
year = int(parts[2])
mon = RUS_MONTHS[month_name]
return f"{year:04d}-{mon:02d}-{day:02d}"
# Обработка числовых форматов
if re.match(r'\d', r):
# Заменяем все разделители на точки для единообразия
r_clean = re.sub(r'[\/\-]', '.', r)
parts = [p for p in r_clean.split('.') if p != '']
if len(parts) >= 3:
try:
# Определяем порядок: день, месяц, год
if len(parts[0]) <= 2 and len(parts[1]) <= 2:
day = int(parts[0])
month = int(parts[1])
year = int(parts[2])
else:
# Если первые части длинные, возможно это год-месяц-день
if len(parts[0]) == 4 and len(parts[1]) <= 2 and len(parts[2]) <= 2:
year = int(parts[0])
month = int(parts[1])
day = int(parts[2])
else:
return raw
if year < 100:
year += 2000
# Проверяем валидность даты
if 1 <= month <= 12 and 1 <= day <= 31:
return f"{year:04d}-{month:02d}-{day:02d}"
except:
pass
return raw
except:
return raw
def find_number(s, date_span=None):
masked = s
if date_span:
start, end = date_span
masked = s[:start] + ' ' * (end - start) + s[end:]
for pat in NUMBER_PATTERNS:
matches = list(pat.finditer(masked))
for m in matches:
candidate = m.group(1).strip(' ,.;:')
# Пропускаем очевидные не-номера
if re.fullmatch(r'(от|по|до|г|гд|договор|соглашение)', candidate, re.I):
continue
# Пропускаем чистые слова без цифр (кроме коротких префиксов)
if (re.search(r'[A-Za-zА-Яа-яЁё]', candidate) and not re.search(r'\d', candidate) and
len(candidate) > 3):
continue
# Пропускаем даты
if re.search(r'\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4}', candidate):
continue
return candidate, m.span(1)
return None, None
def extract_title(s, number_span=None, date_span=None):
s0 = s.strip()
low = s0.lower()
# Определяем позицию обрезки
cut_pos = len(s0)
if number_span:
cut_pos = min(cut_pos, number_span[0])
if date_span:
cut_pos = min(cut_pos, date_span[0])
# Ищем слово "от" перед датой
m_ot = re.search(r'\bот\b', low)
if m_ot and m_ot.start() < cut_pos:
cut_pos = m_ot.start()
# Ищем ключевые слова
best_match = None
best_pos = len(s0)
for kw in TITLE_KEYWORDS:
if kw in low:
pos = low.find(kw)
if pos < best_pos:
best_pos = pos
best_match = kw
if best_match is not None:
# Берем только ключевое слово
return best_match.upper()
# Если нет ключевых слов, проверяем есть ли что-то перед номером/датой
title_candidate = s0[:cut_pos].strip(' ,;:.')
if title_candidate:
# Если в кандидате есть цифры или сложные символы - вероятно это часть номера
if (re.search(r'\d', title_candidate) or
re.search(r'[\/\-_]', title_candidate) or
len(title_candidate) <= 3):
return None
return title_candidate
return None
def parse_contract_cell(cell_text):
s = cell_text.strip()
if not s:
return {"title": None, "number": None, "date_raw": None, "date_norm": None}
s_norm_spaces = normalize_spaces(s)
# Поиск даты
date_raw, date_norm, date_span = find_date(s_norm_spaces)
# Поиск номера
number, number_span = find_number(s_norm_spaces, date_span)
# Название договора
title = extract_title(s_norm_spaces, number_span, date_span)
# Убираем знак № из названия если он там есть
if title and '' in title:
title = re.sub(r'\s*', '', title).strip()
return {
"title": title,
"number": number,
"date_raw": date_raw,
"date_norm": date_norm
}
def read_dict_subkonto2_db(**kwargs):
df = pd.read_sql("""
select
osv.subkonto2 as subkonto2
, dst.naimenovanie as naimenovanie
, coalesce(dst.nomer , osv.nomer) as nomer
, coalesce(dst.date_begin , osv.date_bedin) as date_bedin
, coalesce(dst.date_end, osv.date_end) as date_end
from public.oborotno_salbdovaya_vedomostb osv
left join public.dict_subkonto_two as dst
on dst.subkonto2 = osv.subkonto2
where osv.subkonto2 not in (select dst2.subkonto2 from dict_subkonto_two dst2)""")
return df.to_dict(orient='records')
def split_subkonto_from_1C(**kwargs):
df = pd.read_sql("""
select distinct
osv.subkonto2 as subkonto2
from public.oborotno_salbdovaya_vedomostb osv
""")
return df.to_dict(orient='records')
def merge_dict_and_split_1C(**kwargs):
ti = kwargs['ti']
with DAG(
dag_id='split_subkonto2',
default_args=default_args,
description='Разделение Субконто2 на наименование, номер, дату начала и дату окончания договора',
schedule_interval=None,
catchup=False,
tags=['sigma'],
) as dag:
read_dict_subkonto2_db_task = PythonOperator(
task_id='read_dict_subkonto2_db',
python_callable=read_dict_subkonto2_db,
provide_context=True
)
split_subkonto_from_1C_task = PythonOperator(
task_id='split_subkonto_from_1C',
python_callable=split_subkonto_from_1C,
provide_context=True
)
merge_dict_and_split_1C_task = PythonOperator(
task_id='merge_dict_and_split_1C',
python_callable=merge_dict_and_split_1C,
provide_context=True
)
[read_dict_subkonto2_db_task, split_subkonto_from_1C_task] >> merge_dict_and_split_1C_task