From a5a9c0559cbd928d84238db641f2c85d9e0ac858 Mon Sep 17 00:00:00 2001 From: bn_user Date: Thu, 13 Nov 2025 20:40:34 +0000 Subject: [PATCH] =?UTF-8?q?=D0=A3=D0=B4=D0=B0=D0=BB=D0=B8=D1=82=D1=8C=20da?= =?UTF-8?q?gs/split=5Fsubkonto2.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags/split_subkonto2.py | 366 ---------------------------------------- 1 file changed, 366 deletions(-) delete mode 100644 dags/split_subkonto2.py diff --git a/dags/split_subkonto2.py b/dags/split_subkonto2.py deleted file mode 100644 index 975549f..0000000 --- a/dags/split_subkonto2.py +++ /dev/null @@ -1,366 +0,0 @@ -import re -import requests -import json -import pandas as pd -import numpy as np -from datetime import datetime -from requests.auth import HTTPBasicAuth -from sqlalchemy import create_engine -from airflow import DAG -from airflow.operators.python import PythonOperator - -default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'start_date': datetime(2023, 1, 1), - 'retries': 1, -} - -def get_db_engine(): - """Создает подключение к PostgreSQL""" - DF_CONFIG = { - 'dbname': "postgres", - 'user': "postgres", - 'password': "4a00d4b90cd830da0796", - 'host': "postgresql", - 'port': "5432" - } - return create_engine( - f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" - f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", - pool_size=10, - max_overflow=20 - ) - -RUS_MONTHS = { - "января": 1, "февраля": 2, "марта": 3, "апреля": 4, - "мая": 5, "июня": 6, "июля": 7, "августа": 8, - "сентября": 9, "октября": 10, "ноября": 11, "декабря": 12 -} - -# Улучшенные шаблоны для поиска дат -DATE_PATTERNS = [ - # dd.mm.yyyy с возможными лишними точками и "г", "г." - re.compile(r'\b(\d{1,2}[.\-/]\d{1,2}(?:[.\-/]\d{2,4})?)\s*(?:г\.?|года)?\b', re.I), - # текстовые месяцы: 27 января 2014, 13 июля 2017г - re.compile(r'\b(\d{1,2}\s+(января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s+\d{4})\s*(?:г\.?|года)?\b', re.I), - # даты после "от" без пробелов: от27.05.2024, от09 октября 2017г - re.compile(r'(?:от|с|по)\s*(\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4})', re.I), - re.compile(r'(?:от|с|по)\s*(\d{1,2}\s*(?:января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s*\d{4})', re.I) -] - -# Улучшенные шаблоны для номера договора (приоритетный порядок) -NUMBER_PATTERNS = [ - # Сложные номера с префиксами типа КС-ЦН-728710/2019/00107, СБО/020324-296925 - re.compile(r'\b([A-Za-zА-Яа-яЁё\-]{2,}[\/\-][A-Za-zА-Яа-яЁё0-9\-\._\/]+)\b'), - # Номера после знака № (без самого знака) - re.compile(r'№\s*([A-Za-zА-Яа-яЁё0-9\-\._\/]+)', re.I), - # Номера типа Н-10/2012, КС728710/2013/00053 - re.compile(r'\b([A-Za-zА-Яа-яЁё]{2,}\d+[A-Za-zА-Яа-яЁё0-9\-\._\/]*)\b'), - # Номера с дефисами и слэшами - re.compile(r'\b([A-Za-zА-Яа-яЁё0-9\-\._\/]{5,})\b'), - # Простые числовые номера - re.compile(r'\b(\d{3,}[A-Za-zА-Яа-яЁё]*)\b') -] - -# Ключевые слова для названий (только полные названия) -TITLE_KEYWORDS = [ - "договор займа", "кредитный договор", "кредитное соглашение", - "соглашение", "кр.договор", "кр договор", "кредитного договора", - "договор ofgol", "кр.согл", "кр согл", "договор займа по процентам" -] - -def normalize_spaces(s): - return re.sub(r'\s+', ' ', s).strip() - -def find_date(s): - # Ищем все возможные форматы дат - for pat in DATE_PATTERNS: - m = pat.search(s) - if m: - raw = m.group(1) - start, end = m.span(1) - norm = normalize_date(raw) - if norm != raw: # Если дата нормализовалась успешно - return raw, norm, (start, end) - - return None, None, None - -def normalize_date(raw): - try: - # Убираем лишние символы - r = raw.lower().replace('г.', '').replace('г', '').replace('года', '').strip() - r = re.sub(r'\.+', '.', r) - r = re.sub(r'\s+', ' ', r) - - # Обработка текстовых месяцев - for month_name in RUS_MONTHS.keys(): - if month_name in r: - parts = re.split(r'\s+', r) - if len(parts) >= 3: - day = int(parts[0]) - year = int(parts[2]) - mon = RUS_MONTHS[month_name] - return f"{year:04d}-{mon:02d}-{day:02d}" - - # Обработка числовых форматов - if re.match(r'\d', r): - # Заменяем все разделители на точки для единообразия - r_clean = re.sub(r'[\/\-]', '.', r) - parts = [p for p in r_clean.split('.') if p != ''] - - if len(parts) >= 3: - try: - # Определяем порядок: день, месяц, год - if len(parts[0]) <= 2 and len(parts[1]) <= 2: - day = int(parts[0]) - month = int(parts[1]) - year = int(parts[2]) - else: - # Если первые части длинные, возможно это год-месяц-день - if len(parts[0]) == 4 and len(parts[1]) <= 2 and len(parts[2]) <= 2: - year = int(parts[0]) - month = int(parts[1]) - day = int(parts[2]) - else: - return raw - - if year < 100: - year += 2000 - # Проверяем валидность даты - if 1 <= month <= 12 and 1 <= day <= 31: - return f"{year:04d}-{month:02d}-{day:02d}" - except: - pass - - return raw - except: - return raw - -def find_number(s, date_span=None): - masked = s - if date_span: - start, end = date_span - masked = s[:start] + ' ' * (end - start) + s[end:] - - for pat in NUMBER_PATTERNS: - matches = list(pat.finditer(masked)) - for m in matches: - candidate = m.group(1).strip(' ,.;:') - - # Пропускаем очевидные не-номера - if re.fullmatch(r'(от|по|до|г|гд|договор|соглашение)', candidate, re.I): - continue - - # Пропускаем чистые слова без цифр (кроме коротких префиксов) - if (re.search(r'[A-Za-zА-Яа-яЁё]', candidate) and not re.search(r'\d', candidate) and - len(candidate) > 3): - continue - - # Пропускаем даты - if re.search(r'\d{1,2}[.\-/]\d{1,2}[.\-/]\d{2,4}', candidate): - continue - - return candidate, m.span(1) - - return None, None - -def extract_title(s, number_span=None, date_span=None): - s0 = s.strip() - low = s0.lower() - - # Определяем позицию обрезки - cut_pos = len(s0) - if number_span: - cut_pos = min(cut_pos, number_span[0]) - if date_span: - cut_pos = min(cut_pos, date_span[0]) - - # Ищем слово "от" перед датой - m_ot = re.search(r'\bот\b', low) - if m_ot and m_ot.start() < cut_pos: - cut_pos = m_ot.start() - - # Ищем ключевые слова - best_match = None - best_pos = len(s0) - - for kw in TITLE_KEYWORDS: - if kw in low: - pos = low.find(kw) - if pos < best_pos: - best_pos = pos - best_match = kw - - if best_match is not None: - # Берем только ключевое слово - return best_match.upper() - - # Если нет ключевых слов, проверяем есть ли что-то перед номером/датой - title_candidate = s0[:cut_pos].strip(' ,;:.') - if title_candidate: - # Если в кандидате есть цифры или сложные символы - вероятно это часть номера - if (re.search(r'\d', title_candidate) or - re.search(r'[\/\-_]', title_candidate) or - len(title_candidate) <= 3): - return None - return title_candidate - - return None - -def parse_contract_cell(cell_text): - if pd.isna(cell_text) or not str(cell_text).strip(): - return pd.Series({ - "subkonto2": None, - "naimenovanie": None, - "nomer": None, - "date_begin": None, - "date_end": None - }) - - s = str(cell_text).strip() - s_norm_spaces = normalize_spaces(s) - - # Поиск даты - date_raw, date_norm, date_span = find_date(s_norm_spaces) - - # Поиск номера - number, number_span = find_number(s_norm_spaces, date_span) - - # Название договора - title = extract_title(s_norm_spaces, number_span, date_span) - - # Убираем знак № из названия если он там есть - if title and '№' in title: - title = re.sub(r'№\s*', '', title).strip() - - return pd.Series({ - "subkonto2": cell_text, - "naimenovanie": title if title == None else title.lower(), - "nomer": number, - "date_begin": date_norm, - "date_end": None - }) - -def read_dict_subkonto2_db(**kwargs): - engine = get_db_engine() - - # Extract parameters from kwargs if needed - # param_value = kwargs.get('param_name') - - query = """ - SELECT DISTINCT - osv.schet as schet - , osv.subkonto2 as subkonto2 - , osv.nomer as nomer - , osv.date_begin as date_begin - , osv.date_end as date_end - FROM public.oborotno_salbdovaya_vedomostb osv - LEFT JOIN public.dict_subkonto_sec as dst - ON dst.subkonto2 = osv.subkonto2 - WHERE osv.subkonto2 NOT IN (SELECT dst2.subkonto2 FROM public.dict_subkonto_sec dst2) - and (osv.schet like '%%01%%' or osv.schet like '%%03%%') - """ - - try: - # If you need to pass parameters, do it like this: - # df = pd.read_sql(query, engine, params=(param_value,)) - df = pd.read_sql(query, engine) - return df.to_dict(orient='records') - except Exception as e: - print(f"Error executing SQL query: {e}") - raise - -def split_subkonto_from_1C(**kwargs): - engine = get_db_engine() - df = pd.read_sql(""" - select distinct - osv.schet as schet - , osv.subkonto2 as subkonto2 - from public.oborotno_salbdovaya_vedomostb osv - where osv.subkonto2 not in (select dst2.subkonto2 from public.dict_subkonto_sec dst2) - and (osv.schet like '%%01%%' or osv.schet like '%%03%%') - """, engine) - if df.empty: - return pd.DataFrame(columns=['schet, subkonto2', 'naimenovanie', 'nomer', 'date_begin', 'date_end']).to_dict(orient='records') - result_df = df.merge(df['subkonto2'].apply(parse_contract_cell), how='left', on='subkonto2') - return result_df.to_dict(orient='records') - -def merge_dict_and_split_1C(**kwargs): - - ti = kwargs['ti'] - dict_subkonto = ti.xcom_pull(task_ids='read_dict_subkonto2_db') - split_subkonto = ti.xcom_pull(task_ids='split_subkonto_from_1C') - if not split_subkonto: - return 'Обрабатывать нечего!' - - - df_dict_subkonto = pd.DataFrame.from_records(dict_subkonto) if dict_subkonto else pd.DataFrame() - df_split_subkonto = pd.DataFrame.from_records(split_subkonto) if split_subkonto else pd.DataFrame() - - result_df = df_split_subkonto.merge(df_dict_subkonto, how='left', on='subkonto2', suffixes=('_split', '_dict')) - result_df['schet'] = result_df['schet_dict'] - result_df['naimenovanie'] = None - - result_df['nomer'] = np.where( - (result_df['nomer_dict'].notna()) & (result_df['nomer_split'] != result_df['nomer_dict']), - result_df['nomer_dict'], - result_df['nomer_split'] - ) - - result_df['date_begin'] = np.where( - result_df['date_begin_dict'].isna(), - result_df['date_begin_split'], - result_df['date_begin_dict'] - ) - - result_df['date_end'] = result_df['date_end_dict'] - - result_df = result_df[['schet', 'subkonto2', 'nomer', 'date_begin', 'date_end']] - - engine = get_db_engine() - with engine.begin() as conn: - conn.execute('CREATE TEMP TABLE temp_dict_subkonto_sec (schet text null, subkonto2 text null, nomer text null, date_begin text null, date_end text null)') - result_df.to_sql('temp_dict_subkonto_sec', con=conn, if_exists='append', index=False, method='multi') - conn.execute( - ''' - INSERT INTO public.dict_subkonto_sec (schet, subkonto2, nomer, date_begin, date_end) - SELECT DISTINCT - schet - , subkonto2 - , nomer - , date_begin - , date_end - FROM temp_dict_subkonto_sec''' - ) - - return 'Данные загружены!' - -with DAG( - dag_id='split_subkonto2', - default_args=default_args, - description='Разделение Субконто2 на наименование, номер, дату начала и дату окончания договора. C последующим мэппингом справочника.', - schedule_interval=None, - catchup=False, - tags=['sigma'], -) as dag: - - read_dict_subkonto2_db_task = PythonOperator( - task_id='read_dict_subkonto2_db', - python_callable=read_dict_subkonto2_db, - provide_context=True - ) - - split_subkonto_from_1C_task = PythonOperator( - task_id='split_subkonto_from_1C', - python_callable=split_subkonto_from_1C, - provide_context=True - ) - - merge_dict_and_split_1C_task = PythonOperator( - task_id='merge_dict_and_split_1C', - python_callable=merge_dict_and_split_1C, - provide_context=True - ) - -[read_dict_subkonto2_db_task, split_subkonto_from_1C_task] >> merge_dict_and_split_1C_task \ No newline at end of file