Удалить dags/split_subkonto2.py

This commit is contained in:
bn_user 2025-11-13 20:40:34 +00:00
parent 54f59e8b2a
commit a5a9c0559c
1 changed files with 0 additions and 366 deletions

View File

@ -1,366 +0,0 @@
import re
import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
RUS_MONTHS = {
"января": 1, "февраля": 2, "марта": 3, "апреля": 4,
"мая": 5, "июня": 6, "июля": 7, "августа": 8,
"сентября": 9, "октября": 10, "ноября": 11, "декабря": 12
}
# Улучшенные шаблоны для поиска дат
DATE_PATTERNS = [
# dd.mm.yyyy с возможными лишними точками и "г", "г."
re.compile(r'\b(\d{1,2}[.\-/]\d{1,2}(?:[.\-/]\d{2,4})?)\s*(?:г\.?|года)?\b', re.I),
# текстовые месяцы: 27 января 2014, 13 июля 2017г
re.compile(r'\b(\d{1,2}\s+(января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s+\d{4})\s*(?:г\.?|года)?\b', re.I),
# даты после "от" без пробелов: от27.05.2024, от09 октября 2017г
re.compile(r'(?:от|с|по)\s*(\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4})', re.I),
re.compile(r'(?:от|с|по)\s*(\d{1,2}\s*(?:января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s*\d{4})', re.I)
]
# Улучшенные шаблоны для номера договора (приоритетный порядок)
NUMBER_PATTERNS = [
# Сложные номера с префиксами типа КС-ЦН-728710/2019/00107, СБО/020324-296925
re.compile(r'\b([A-Za-zА-Яа-яЁё\-]{2,}[\/\-][A-Za-zА-Яа-яЁё0-9\-\._\/]+)\b'),
# Номера после знака № (без самого знака)
re.compile(r'\s*([A-Za-zА-Яа-яЁё0-9\-\._\/]+)', re.I),
# Номера типа Н-10/2012, КС728710/2013/00053
re.compile(r'\b([A-Za-zА-Яа-яЁё]{2,}\d+[A-Za-zА-Яа-яЁё0-9\-\._\/]*)\b'),
# Номера с дефисами и слэшами
re.compile(r'\b([A-Za-zА-Яа-яЁё0-9\-\._\/]{5,})\b'),
# Простые числовые номера
re.compile(r'\b(\d{3,}[A-Za-zА-Яа-яЁё]*)\b')
]
# Ключевые слова для названий (только полные названия)
TITLE_KEYWORDS = [
"договор займа", "кредитный договор", "кредитное соглашение",
"соглашение", "кр.договор", "кр договор", "кредитного договора",
"договор ofgol", "кр.согл", "кр согл", "договор займа по процентам"
]
def normalize_spaces(s):
return re.sub(r'\s+', ' ', s).strip()
def find_date(s):
# Ищем все возможные форматы дат
for pat in DATE_PATTERNS:
m = pat.search(s)
if m:
raw = m.group(1)
start, end = m.span(1)
norm = normalize_date(raw)
if norm != raw: # Если дата нормализовалась успешно
return raw, norm, (start, end)
return None, None, None
def normalize_date(raw):
try:
# Убираем лишние символы
r = raw.lower().replace('г.', '').replace('г', '').replace('года', '').strip()
r = re.sub(r'\.+', '.', r)
r = re.sub(r'\s+', ' ', r)
# Обработка текстовых месяцев
for month_name in RUS_MONTHS.keys():
if month_name in r:
parts = re.split(r'\s+', r)
if len(parts) >= 3:
day = int(parts[0])
year = int(parts[2])
mon = RUS_MONTHS[month_name]
return f"{year:04d}-{mon:02d}-{day:02d}"
# Обработка числовых форматов
if re.match(r'\d', r):
# Заменяем все разделители на точки для единообразия
r_clean = re.sub(r'[\/\-]', '.', r)
parts = [p for p in r_clean.split('.') if p != '']
if len(parts) >= 3:
try:
# Определяем порядок: день, месяц, год
if len(parts[0]) <= 2 and len(parts[1]) <= 2:
day = int(parts[0])
month = int(parts[1])
year = int(parts[2])
else:
# Если первые части длинные, возможно это год-месяц-день
if len(parts[0]) == 4 and len(parts[1]) <= 2 and len(parts[2]) <= 2:
year = int(parts[0])
month = int(parts[1])
day = int(parts[2])
else:
return raw
if year < 100:
year += 2000
# Проверяем валидность даты
if 1 <= month <= 12 and 1 <= day <= 31:
return f"{year:04d}-{month:02d}-{day:02d}"
except:
pass
return raw
except:
return raw
def find_number(s, date_span=None):
masked = s
if date_span:
start, end = date_span
masked = s[:start] + ' ' * (end - start) + s[end:]
for pat in NUMBER_PATTERNS:
matches = list(pat.finditer(masked))
for m in matches:
candidate = m.group(1).strip(' ,.;:')
# Пропускаем очевидные не-номера
if re.fullmatch(r'(от|по|до|г|гд|договор|соглашение)', candidate, re.I):
continue
# Пропускаем чистые слова без цифр (кроме коротких префиксов)
if (re.search(r'[A-Za-zА-Яа-яЁё]', candidate) and not re.search(r'\d', candidate) and
len(candidate) > 3):
continue
# Пропускаем даты
if re.search(r'\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4}', candidate):
continue
return candidate, m.span(1)
return None, None
def extract_title(s, number_span=None, date_span=None):
s0 = s.strip()
low = s0.lower()
# Определяем позицию обрезки
cut_pos = len(s0)
if number_span:
cut_pos = min(cut_pos, number_span[0])
if date_span:
cut_pos = min(cut_pos, date_span[0])
# Ищем слово "от" перед датой
m_ot = re.search(r'\bот\b', low)
if m_ot and m_ot.start() < cut_pos:
cut_pos = m_ot.start()
# Ищем ключевые слова
best_match = None
best_pos = len(s0)
for kw in TITLE_KEYWORDS:
if kw in low:
pos = low.find(kw)
if pos < best_pos:
best_pos = pos
best_match = kw
if best_match is not None:
# Берем только ключевое слово
return best_match.upper()
# Если нет ключевых слов, проверяем есть ли что-то перед номером/датой
title_candidate = s0[:cut_pos].strip(' ,;:.')
if title_candidate:
# Если в кандидате есть цифры или сложные символы - вероятно это часть номера
if (re.search(r'\d', title_candidate) or
re.search(r'[\/\-_]', title_candidate) or
len(title_candidate) <= 3):
return None
return title_candidate
return None
def parse_contract_cell(cell_text):
if pd.isna(cell_text) or not str(cell_text).strip():
return pd.Series({
"subkonto2": None,
"naimenovanie": None,
"nomer": None,
"date_begin": None,
"date_end": None
})
s = str(cell_text).strip()
s_norm_spaces = normalize_spaces(s)
# Поиск даты
date_raw, date_norm, date_span = find_date(s_norm_spaces)
# Поиск номера
number, number_span = find_number(s_norm_spaces, date_span)
# Название договора
title = extract_title(s_norm_spaces, number_span, date_span)
# Убираем знак № из названия если он там есть
if title and '' in title:
title = re.sub(r'\s*', '', title).strip()
return pd.Series({
"subkonto2": cell_text,
"naimenovanie": title if title == None else title.lower(),
"nomer": number,
"date_begin": date_norm,
"date_end": None
})
def read_dict_subkonto2_db(**kwargs):
engine = get_db_engine()
# Extract parameters from kwargs if needed
# param_value = kwargs.get('param_name')
query = """
SELECT DISTINCT
osv.schet as schet
, osv.subkonto2 as subkonto2
, osv.nomer as nomer
, osv.date_begin as date_begin
, osv.date_end as date_end
FROM public.oborotno_salbdovaya_vedomostb osv
LEFT JOIN public.dict_subkonto_sec as dst
ON dst.subkonto2 = osv.subkonto2
WHERE osv.subkonto2 NOT IN (SELECT dst2.subkonto2 FROM public.dict_subkonto_sec dst2)
and (osv.schet like '%%01%%' or osv.schet like '%%03%%')
"""
try:
# If you need to pass parameters, do it like this:
# df = pd.read_sql(query, engine, params=(param_value,))
df = pd.read_sql(query, engine)
return df.to_dict(orient='records')
except Exception as e:
print(f"Error executing SQL query: {e}")
raise
def split_subkonto_from_1C(**kwargs):
engine = get_db_engine()
df = pd.read_sql("""
select distinct
osv.schet as schet
, osv.subkonto2 as subkonto2
from public.oborotno_salbdovaya_vedomostb osv
where osv.subkonto2 not in (select dst2.subkonto2 from public.dict_subkonto_sec dst2)
and (osv.schet like '%%01%%' or osv.schet like '%%03%%')
""", engine)
if df.empty:
return pd.DataFrame(columns=['schet, subkonto2', 'naimenovanie', 'nomer', 'date_begin', 'date_end']).to_dict(orient='records')
result_df = df.merge(df['subkonto2'].apply(parse_contract_cell), how='left', on='subkonto2')
return result_df.to_dict(orient='records')
def merge_dict_and_split_1C(**kwargs):
ti = kwargs['ti']
dict_subkonto = ti.xcom_pull(task_ids='read_dict_subkonto2_db')
split_subkonto = ti.xcom_pull(task_ids='split_subkonto_from_1C')
if not split_subkonto:
return 'Обрабатывать нечего!'
df_dict_subkonto = pd.DataFrame.from_records(dict_subkonto) if dict_subkonto else pd.DataFrame()
df_split_subkonto = pd.DataFrame.from_records(split_subkonto) if split_subkonto else pd.DataFrame()
result_df = df_split_subkonto.merge(df_dict_subkonto, how='left', on='subkonto2', suffixes=('_split', '_dict'))
result_df['schet'] = result_df['schet_dict']
result_df['naimenovanie'] = None
result_df['nomer'] = np.where(
(result_df['nomer_dict'].notna()) & (result_df['nomer_split'] != result_df['nomer_dict']),
result_df['nomer_dict'],
result_df['nomer_split']
)
result_df['date_begin'] = np.where(
result_df['date_begin_dict'].isna(),
result_df['date_begin_split'],
result_df['date_begin_dict']
)
result_df['date_end'] = result_df['date_end_dict']
result_df = result_df[['schet', 'subkonto2', 'nomer', 'date_begin', 'date_end']]
engine = get_db_engine()
with engine.begin() as conn:
conn.execute('CREATE TEMP TABLE temp_dict_subkonto_sec (schet text null, subkonto2 text null, nomer text null, date_begin text null, date_end text null)')
result_df.to_sql('temp_dict_subkonto_sec', con=conn, if_exists='append', index=False, method='multi')
conn.execute(
'''
INSERT INTO public.dict_subkonto_sec (schet, subkonto2, nomer, date_begin, date_end)
SELECT DISTINCT
schet
, subkonto2
, nomer
, date_begin
, date_end
FROM temp_dict_subkonto_sec'''
)
return 'Данные загружены!'
with DAG(
dag_id='split_subkonto2',
default_args=default_args,
description='Разделение Субконто2 на наименование, номер, дату начала и дату окончания договора. C последующим мэппингом справочника.',
schedule_interval=None,
catchup=False,
tags=['sigma'],
) as dag:
read_dict_subkonto2_db_task = PythonOperator(
task_id='read_dict_subkonto2_db',
python_callable=read_dict_subkonto2_db,
provide_context=True
)
split_subkonto_from_1C_task = PythonOperator(
task_id='split_subkonto_from_1C',
python_callable=split_subkonto_from_1C,
provide_context=True
)
merge_dict_and_split_1C_task = PythonOperator(
task_id='merge_dict_and_split_1C',
python_callable=merge_dict_and_split_1C,
provide_context=True
)
[read_dict_subkonto2_db_task, split_subkonto_from_1C_task] >> merge_dict_and_split_1C_task