import re import requests import json import pandas as pd import numpy as np from datetime import datetime from requests.auth import HTTPBasicAuth from sqlalchemy import create_engine from airflow import DAG from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, } def get_db_engine(): """Создает подключение к PostgreSQL""" DF_CONFIG = { 'dbname': "postgres", 'user': "postgres", 'password': "4a00d4b90cd830da0796", 'host': "postgresql", 'port': "5432" } return create_engine( f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", pool_size=10, max_overflow=20 ) RUS_MONTHS = { "января": 1, "февраля": 2, "марта": 3, "апреля": 4, "мая": 5, "июня": 6, "июля": 7, "августа": 8, "сентября": 9, "октября": 10, "ноября": 11, "декабря": 12 } # Улучшенные шаблоны для поиска дат DATE_PATTERNS = [ # dd.mm.yyyy с возможными лишними точками и "г", "г." re.compile(r'\b(\d{1,2}[.\-/]\d{1,2}(?:[.\-/]\d{2,4})?)\s*(?:г\.?|года)?\b', re.I), # текстовые месяцы: 27 января 2014, 13 июля 2017г re.compile(r'\b(\d{1,2}\s+(января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s+\d{4})\s*(?:г\.?|года)?\b', re.I), # даты после "от" без пробелов: от27.05.2024, от09 октября 2017г re.compile(r'(?:от|с|по)\s*(\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4})', re.I), re.compile(r'(?:от|с|по)\s*(\d{1,2}\s*(?:января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s*\d{4})', re.I) ] # Улучшенные шаблоны для номера договора (приоритетный порядок) NUMBER_PATTERNS = [ # Сложные номера с префиксами типа КС-ЦН-728710/2019/00107, СБО/020324-296925 re.compile(r'\b([A-Za-zА-Яа-яЁё\-]{2,}[\/\-][A-Za-zА-Яа-яЁё0-9\-\._\/]+)\b'), # Номера после знака № (без самого знака) re.compile(r'№\s*([A-Za-zА-Яа-яЁё0-9\-\._\/]+)', re.I), # Номера типа Н-10/2012, КС728710/2013/00053 re.compile(r'\b([A-Za-zА-Яа-яЁё]{2,}\d+[A-Za-zА-Яа-яЁё0-9\-\._\/]*)\b'), # Номера с дефисами и слэшами re.compile(r'\b([A-Za-zА-Яа-яЁё0-9\-\._\/]{5,})\b'), # Простые числовые номера re.compile(r'\b(\d{3,}[A-Za-zА-Яа-яЁё]*)\b') ] # Ключевые слова для названий (только полные названия) TITLE_KEYWORDS = [ "договор займа", "кредитный договор", "кредитное соглашение", "соглашение", "кр.договор", "кр договор", "кредитного договора", "договор ofgol", "кр.согл", "кр согл", "договор займа по процентам" ] def normalize_spaces(s): return re.sub(r'\s+', ' ', s).strip() def find_date(s): # Ищем все возможные форматы дат for pat in DATE_PATTERNS: m = pat.search(s) if m: raw = m.group(1) start, end = m.span(1) norm = normalize_date(raw) if norm != raw: # Если дата нормализовалась успешно return raw, norm, (start, end) return None, None, None def normalize_date(raw): try: # Убираем лишние символы r = raw.lower().replace('г.', '').replace('г', '').replace('года', '').strip() r = re.sub(r'\.+', '.', r) r = re.sub(r'\s+', ' ', r) # Обработка текстовых месяцев for month_name in RUS_MONTHS.keys(): if month_name in r: parts = re.split(r'\s+', r) if len(parts) >= 3: day = int(parts[0]) year = int(parts[2]) mon = RUS_MONTHS[month_name] return f"{year:04d}-{mon:02d}-{day:02d}" # Обработка числовых форматов if re.match(r'\d', r): # Заменяем все разделители на точки для единообразия r_clean = re.sub(r'[\/\-]', '.', r) parts = [p for p in r_clean.split('.') if p != ''] if len(parts) >= 3: try: # Определяем порядок: день, месяц, год if len(parts[0]) <= 2 and len(parts[1]) <= 2: day = int(parts[0]) month = int(parts[1]) year = int(parts[2]) else: # Если первые части длинные, возможно это год-месяц-день if len(parts[0]) == 4 and len(parts[1]) <= 2 and len(parts[2]) <= 2: year = int(parts[0]) month = int(parts[1]) day = int(parts[2]) else: return raw if year < 100: year += 2000 # Проверяем валидность даты if 1 <= month <= 12 and 1 <= day <= 31: return f"{year:04d}-{month:02d}-{day:02d}" except: pass return raw except: return raw def find_number(s, date_span=None): masked = s if date_span: start, end = date_span masked = s[:start] + ' ' * (end - start) + s[end:] for pat in NUMBER_PATTERNS: matches = list(pat.finditer(masked)) for m in matches: candidate = m.group(1).strip(' ,.;:') # Пропускаем очевидные не-номера if re.fullmatch(r'(от|по|до|г|гд|договор|соглашение)', candidate, re.I): continue # Пропускаем чистые слова без цифр (кроме коротких префиксов) if (re.search(r'[A-Za-zА-Яа-яЁё]', candidate) and not re.search(r'\d', candidate) and len(candidate) > 3): continue # Пропускаем даты if re.search(r'\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4}', candidate): continue return candidate, m.span(1) return None, None def extract_title(s, number_span=None, date_span=None): s0 = s.strip() low = s0.lower() # Определяем позицию обрезки cut_pos = len(s0) if number_span: cut_pos = min(cut_pos, number_span[0]) if date_span: cut_pos = min(cut_pos, date_span[0]) # Ищем слово "от" перед датой m_ot = re.search(r'\bот\b', low) if m_ot and m_ot.start() < cut_pos: cut_pos = m_ot.start() # Ищем ключевые слова best_match = None best_pos = len(s0) for kw in TITLE_KEYWORDS: if kw in low: pos = low.find(kw) if pos < best_pos: best_pos = pos best_match = kw if best_match is not None: # Берем только ключевое слово return best_match.upper() # Если нет ключевых слов, проверяем есть ли что-то перед номером/датой title_candidate = s0[:cut_pos].strip(' ,;:.') if title_candidate: # Если в кандидате есть цифры или сложные символы - вероятно это часть номера if (re.search(r'\d', title_candidate) or re.search(r'[\/\-_]', title_candidate) or len(title_candidate) <= 3): return None return title_candidate return None def parse_contract_cell(cell_text): if pd.isna(cell_text) or not str(cell_text).strip(): return pd.Series({ "subkonto2": None, "naimenovanie": None, "nomer": None, "date_begin": None, "date_end": None }) s = str(cell_text).strip() s_norm_spaces = normalize_spaces(s) # Поиск даты date_raw, date_norm, date_span = find_date(s_norm_spaces) # Поиск номера number, number_span = find_number(s_norm_spaces, date_span) # Название договора title = extract_title(s_norm_spaces, number_span, date_span) # Убираем знак № из названия если он там есть if title and '№' in title: title = re.sub(r'№\s*', '', title).strip() return pd.Series({ "subkonto2": cell_text, "naimenovanie": title if title == None else title.lower(), "nomer": number, "date_begin": date_norm, "date_end": None }) def read_dict_subkonto2_db(**kwargs): engine = get_db_engine() df = pd.read_sql(""" select osv.subkonto2 as subkonto2 , dst.naimenovanie as naimenovanie , osv.nomer as nomer , osv.date_begin as date_begin , osv.date_end as date_end from public.oborotno_salbdovaya_vedomostb osv left join public.dict_subkonto_two as dst on dst.subkonto2 = osv.subkonto2 where osv.subkonto2 not in (select dst2.subkonto2 from public.dict_subkonto_two dst2) and (osv.nomer is null or osv.date_begin is null or osv.date_end is null) """, engine) return df.to_dict(orient='records') def split_subkonto_from_1C(**kwargs): engine = get_db_engine() df = pd.read_sql(""" select distinct osv.subkonto2 as subkonto2 from public.oborotno_salbdovaya_vedomostb osv where osv.subkonto2 not in (select dst2.subkonto2 from public.dict_subkonto_two dst2) """, engine) result_df = df['subkonto2'].apply(parse_contract_cell) return result_df.to_dict(orient='records') def merge_dict_and_split_1C(**kwargs): ti = kwargs['ti'] dict_subkonto = ti.xcom_pull(task_ids='read_dict_subkonto2_db') split_subkonto = ti.xcom_pull(task_ids='split_subkonto_from_1C') if not split_subkonto: return 'Обрабатывать нечего!' df_dict_subkonto = pd.DataFrame.from_records(dict_subkonto) if dict_subkonto else pd.DataFrame() df_split_subkonto = pd.DataFrame.from_records(split_subkonto) if split_subkonto else pd.DataFrame() result_df = df_split_subkonto.merge(df_dict_subkonto, how='left', on='subkonto2', suffixes=('_split', '_dict')) result_df['naimenovanie'] = result_df['naimenovanie_split'] result_df['nomer'] = np.where( (result_df['nomer_dict'].notna()) & (result_df['nomer_split'] != result_df['nomer_dict']), result_df['nomer_dict'], result_df['nomer_split'] ) result_df['date_begin'] = np.where( result_df['date_begin_dict'].isna(), result_df['date_begin_split'], result_df['date_begin_dict'] ) result_df['date_end'] = result_df['date_end_dict'] result_df = result_df[['subkonto2', 'naimenovanie', 'nomer', 'date_begin', 'date_end']] engine = get_db_engine() with engine.begin() as conn: conn.execute('CREATE TEMP TABLE temp_dict_subkonto_two (subkonto2 text null, naimenovanie text null, nomer text null, date_begin text null, date_end text null)') result_df.to_sql('temp_dict_subkonto_two', con=conn, if_exists='append', index=False, method='multi') conn.execute('INSERT INTO public.dict_subkonto_two SELECT * FROM temp_dict_subkonto_two') return result_df.to_dict('Данные загружены!') with DAG( dag_id='split_subkonto2', default_args=default_args, description='Разделение Субконто2 на наименование, номер, дату начала и дату окончания договора. C последующим мэппингом справочника.', schedule_interval=None, catchup=False, tags=['sigma'], ) as dag: read_dict_subkonto2_db_task = PythonOperator( task_id='read_dict_subkonto2_db', python_callable=read_dict_subkonto2_db, provide_context=True ) split_subkonto_from_1C_task = PythonOperator( task_id='split_subkonto_from_1C', python_callable=split_subkonto_from_1C, provide_context=True ) merge_dict_and_split_1C_task = PythonOperator( task_id='merge_dict_and_split_1C', python_callable=merge_dict_and_split_1C, provide_context=True ) [read_dict_subkonto2_db_task, split_subkonto_from_1C_task] >> merge_dict_and_split_1C_task