From 337207cbf589c5aad51757a4b0dcd9a289991867 Mon Sep 17 00:00:00 2001 From: bn_user Date: Fri, 5 Dec 2025 08:50:09 +0000 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20dags/poruchitelstva.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags/poruchitelstva.py | 208 ++++++++++++++++++++++++++++++----------- 1 file changed, 153 insertions(+), 55 deletions(-) diff --git a/dags/poruchitelstva.py b/dags/poruchitelstva.py index d9e29cd..65c8c70 100644 --- a/dags/poruchitelstva.py +++ b/dags/poruchitelstva.py @@ -32,96 +32,194 @@ def get_db_engine(): max_overflow=20 ) -def update_pogasheniya_structure_with_detailed_errors(**kwargs): +def update_poruchitelstva_structure_with_detailed_errors(**kwargs): """ - Версия с подробными сообщениями об ошибках + Версия с подробными сообщениями об ошибках для поручительств """ engine = get_db_engine() - # Шаг 1: Проверяем существование функции get_years() + # Шаг 1: Проверяем данные в таблице sigma_gk try: with engine.connect() as conn: - # Проверяем, существует ли функция - func_exists = conn.execute(""" - SELECT EXISTS( - SELECT 1 FROM pg_proc - JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid - WHERE proname = 'get_years' - AND nspname = 'public' - ) - """).scalar() - - if not func_exists: - return "ERROR: Функция get_years() не существует в схеме public" - - # Проверяем существование таблицы years_for_pogasheniya + # Проверяем существование таблицы sigma_gk table_exists = conn.execute(""" SELECT EXISTS( SELECT 1 FROM information_schema.tables - WHERE table_name = 'years_for_pogasheniya' + WHERE table_name = 'sigma_gk' AND table_schema = 'public' ) """).scalar() if not table_exists: - return "ERROR: Таблица years_for_pogasheniya не существует" + return "ERROR: Таблица sigma_gk не существует" - # Проверяем, есть ли данные в таблице + # Проверяем, есть ли данные в таблице sigma_gk has_data = conn.execute(""" - SELECT EXISTS(SELECT 1 FROM years_for_pogasheniya LIMIT 1) + SELECT EXISTS(SELECT 1 FROM sigma_gk LIMIT 1) """).scalar() if not has_data: - return "ERROR: Таблица years_for_pogasheniya пустая" + return "ERROR: Таблица sigma_gk пустая" - # Вызываем функцию get_years() - result = conn.execute("SELECT * FROM get_years()").fetchone() + # Проверяем структуру таблицы sigma_gk + columns_check = conn.execute(""" + SELECT + COUNT(CASE WHEN column_name = 'poruchitel_name' THEN 1 END) as has_poruchitel_name, + COUNT(CASE WHEN column_name = 'is_active' THEN 1 END) as has_is_active + FROM information_schema.columns + WHERE table_name = 'sigma_gk' + AND table_schema = 'public' + """).fetchone() - if result is None: - return "ERROR: Функция get_years() вернула NULL. Проверьте данные в таблице" + has_poruchitel_name, has_is_active = columns_check - start_year, end_year = result + if not has_poruchitel_name: + return "ERROR: В таблице sigma_gk отсутствует колонка 'poruchitel_name'" + if not has_is_active: + return "ERROR: В таблице sigma_gk отсутствует колонка 'is_active'" - # Детальная проверка значений - error_messages = [] + # Проверяем, есть ли активные компании + active_companies_count = conn.execute(""" + SELECT COUNT(DISTINCT poruchitel_name) + FROM sigma_gk + WHERE is_active = '1' + AND poruchitel_name IS NOT NULL + AND poruchitel_name != '' + """).scalar() - if start_year is None: - error_messages.append("start_year is NULL") - if end_year is None: - error_messages.append("end_year is NULL") + if active_companies_count == 0: + # Проверяем, есть ли вообще данные в колонке poruchitel_name + any_companies = conn.execute(""" + SELECT COUNT(DISTINCT poruchitel_name) + FROM sigma_gk + WHERE poruchitel_name IS NOT NULL + AND poruchitel_name != '' + """).scalar() + + if any_companies == 0: + return "ERROR: В таблице sigma_gk нет данных в колонке poruchitel_name" + else: + return "ERROR: В таблице sigma_gk нет активных компаний (is_active = '1')" - if error_messages: - return f"ERROR: {', '.join(error_messages)}" + # Получаем список активных компаний для информации + active_companies = conn.execute(""" + SELECT DISTINCT poruchitel_name + FROM sigma_gk + WHERE is_active = '1' + AND poruchitel_name IS NOT NULL + AND poruchitel_name != '' + ORDER BY poruchitel_name + LIMIT 10 + """).fetchall() - # Преобразуем в числа с проверкой - try: - start_year_int = int(start_year) - end_year_int = int(end_year) - except ValueError: - return f"ERROR: Невозможно преобразовать годы в числа: start_year='{start_year}', end_year='{end_year}'" - - if start_year_int > end_year_int: - return f"ERROR: Неправильный порядок лет: start_year ({start_year}) > end_year ({end_year})" + companies_list = [row[0] for row in active_companies] except Exception as e: - return f"ERROR: Ошибка при проверке данных: {str(e)}" + return f"ERROR: Ошибка при проверке таблицы sigma_gk: {str(e)}" - # Шаг 2: Обновляем структуру + # Шаг 2: Проверяем существование необходимых функций + try: + with engine.connect() as conn: + # Проверяем существование функции update_poruchitelstva_system + func_exists = conn.execute(""" + SELECT EXISTS( + SELECT 1 FROM pg_proc + JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid + WHERE proname = 'update_poruchitelstva_system' + AND nspname = 'public' + ) + """).scalar() + + if not func_exists: + return "ERROR: Функция update_poruchitelstva_system() не существует" + + # Проверяем существование функции rebuild_poruchitelstva_table + func_exists = conn.execute(""" + SELECT EXISTS( + SELECT 1 FROM pg_proc + JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid + WHERE proname = 'rebuild_poruchitelstva_table' + AND nspname = 'public' + ) + """).scalar() + + if not func_exists: + return "ERROR: Функция rebuild_poruchitelstva_table() не существует" + + # Проверяем существование таблицы poruchitelstva + table_exists = conn.execute(""" + SELECT EXISTS( + SELECT 1 FROM information_schema.tables + WHERE table_name = 'poruchitelstva' + AND table_schema = 'public' + ) + """).scalar() + + if not table_exists: + print("WARNING: Таблица poruchitelstva не существует, будет создана") + + except Exception as e: + return f"ERROR: Ошибка при проверке функций: {str(e)}" + + # Шаг 3: Обновляем систему поручительств try: with engine.begin() as conn: + # Сначала обновляем конфигурацию компаний update_result = conn.execute( - "SELECT update_years_and_rebuild(%s, %s)", - (str(start_year), str(end_year)) + "SELECT update_poruchitelstva_system()" ).scalar() - return f"SUCCESS: {update_result}" + if not update_result: + return "ERROR: Функция update_poruchitelstva_system() вернула пустой результат" + + # Затем пересоздаем таблицу + conn.execute("SELECT rebuild_poruchitelstva_table()") + + # Проверяем результат + check_result = conn.execute(""" + SELECT + (SELECT COUNT(*) FROM poruchitelstva) as main_table_count, + (SELECT COUNT(*) FROM poruchitelstva_normalized) as normalized_count, + (SELECT COUNT(DISTINCT column_name) - 7 + FROM information_schema.columns + WHERE table_name = 'poruchitelstva' + AND table_schema = 'public') as company_columns_count + """).fetchone() + + main_count, normalized_count, company_cols = check_result + + result_message = ( + f"SUCCESS: Система поручительств обновлена.\n" + f"• Активных компаний: {active_companies_count}\n" + f"• Пример компаний: {', '.join(companies_list[:3])}{'...' if len(companies_list) > 3 else ''}\n" + f"• Записей в основной таблице: {main_count or 0}\n" + f"• Записей в нормализованной таблице: {normalized_count or 0}\n" + f"• Колонок компаний в таблице: {company_cols or 0}" + ) + + if normalized_count == 0: + result_message += "\nWARNING: Нормализованная таблица пуста. Возможно, нет данных о поручительствах." + + return result_message except Exception as e: - return f"ERROR: Ошибка при пересоздании таблицы: {str(e)}" + error_msg = str(e) + + # Проверяем, есть ли конкретные ошибки + if "duplicate key" in error_msg.lower(): + return f"ERROR: Обнаружены дубликаты ключей. {error_msg}" + elif "unique constraint" in error_msg.lower(): + return f"ERROR: Нарушение уникальности. {error_msg}" + elif "ambiguous column" in error_msg.lower(): + return f"ERROR: Неоднозначное имя колонки. {error_msg}" + elif "function does not exist" in error_msg.lower(): + return f"ERROR: Отсутствует необходимая функция. {error_msg}" + else: + return f"ERROR: Ошибка при обновлении системы поручительств: {error_msg}" with DAG( - dag_id='update_pogasheniya', + dag_id='update_poruchitelstva', default_args=default_args, description='Обновление списка погашений', schedule_interval=None, @@ -129,10 +227,10 @@ with DAG( tags=['sigma'], ) as dag: - pogasheniya_task = PythonOperator( - task_id='update_pogasheniya_structure_with_detailed_errors', - python_callable=update_pogasheniya_structure_with_detailed_errors, + poruchitelstva_task = PythonOperator( + task_id='update_poruchitelstva_structure_with_detailed_errors', + python_callable=update_poruchitelstva_structure_with_detailed_errors, provide_context=True ) -pogasheniya_task \ No newline at end of file +poruchitelstva_task \ No newline at end of file