diff --git a/dags/poruchitelstva.py b/dags/poruchitelstva.py index 255d9bf..ea68369 100644 --- a/dags/poruchitelstva.py +++ b/dags/poruchitelstva.py @@ -38,9 +38,22 @@ def update_poruchitelstva_structure_with_detailed_errors(**kwargs): """ engine = get_db_engine() - # Шаг 1: Проверяем данные в таблице sigma_gk + # Шаг 1: Проверяем существование необходимых объектов try: with engine.connect() as conn: + # Проверяем существование функции get_active_companies() + func_exists = conn.execute(""" + SELECT EXISTS( + SELECT 1 FROM pg_proc + JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid + WHERE proname = 'get_active_companies' + AND nspname = 'public' + ) + """).scalar() + + if not func_exists: + return "ERROR: Функция get_active_companies() не существует в схеме public" + # Проверяем существование таблицы sigma_gk table_exists = conn.execute(""" SELECT EXISTS( @@ -53,177 +66,106 @@ def update_poruchitelstva_structure_with_detailed_errors(**kwargs): if not table_exists: return "ERROR: Таблица sigma_gk не существует" - # Проверяем, есть ли данные в таблице sigma_gk - has_data = conn.execute(""" - SELECT EXISTS(SELECT 1 FROM sigma_gk LIMIT 1) - """).scalar() - - if not has_data: - return "ERROR: Таблица sigma_gk пустая" - - # Проверяем структуру таблицы sigma_gk - columns_check = conn.execute(""" - SELECT - COUNT(CASE WHEN column_name = 'poruchitel_name' THEN 1 END) as has_poruchitel_name, - COUNT(CASE WHEN column_name = 'is_active' THEN 1 END) as has_is_active - FROM information_schema.columns - WHERE table_name = 'sigma_gk' - AND table_schema = 'public' - """).fetchone() - - has_poruchitel_name, has_is_active = columns_check - - if not has_poruchitel_name: - return "ERROR: В таблице sigma_gk отсутствует колонка 'poruchitel_name'" - if not has_is_active: - return "ERROR: В таблице sigma_gk отсутствует колонка 'is_active'" - # Проверяем, есть ли активные компании - active_companies_count = conn.execute(""" - SELECT COUNT(DISTINCT poruchitel_name) + active_count = conn.execute(""" + SELECT COUNT(*) FROM sigma_gk WHERE is_active = '1' AND poruchitel_name IS NOT NULL AND poruchitel_name != '' """).scalar() - if active_companies_count == 0: - # Проверяем, есть ли вообще данные в колонке poruchitel_name - any_companies = conn.execute(""" - SELECT COUNT(DISTINCT poruchitel_name) + if active_count == 0: + # Получаем список всех компаний для отладки + all_companies = conn.execute(""" + SELECT name, inn, is_active, poruchitel_name FROM sigma_gk - WHERE poruchitel_name IS NOT NULL - AND poruchitel_name != '' - """).scalar() + LIMIT 10 + """).fetchall() - if any_companies == 0: - return "ERROR: В таблице sigma_gk нет данных в колонке poruchitel_name" - else: - return "ERROR: В таблице sigma_gk нет активных компаний (is_active = '1')" + companies_info = "\n".join([ + f"{row[0]} (ИНН: {row[1]}, active: {row[2]}, poruchitel: {row[3]})" + for row in all_companies + ]) + + return f"ERROR: Нет активных компаний в sigma_gk\n\nДоступные компании:\n{companies_info}" # Получаем список активных компаний для информации active_companies = conn.execute(""" - SELECT DISTINCT poruchitel_name + SELECT poruchitel_name FROM sigma_gk WHERE is_active = '1' AND poruchitel_name IS NOT NULL AND poruchitel_name != '' ORDER BY poruchitel_name - LIMIT 10 """).fetchall() - companies_list = [row[0] for row in active_companies] + companies_list = ", ".join([row[0] for row in active_companies]) except Exception as e: - return f"ERROR: Ошибка при проверке таблицы sigma_gk: {str(e)}" + return f"ERROR: Ошибка при проверке данных: {str(e)}" - # Шаг 2: Проверяем существование необходимых функций - try: - with engine.connect() as conn: - # Проверяем существование функции update_poruchitelstva_system - func_exists = conn.execute(""" - SELECT EXISTS( - SELECT 1 FROM pg_proc - JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid - WHERE proname = 'update_poruchitelstva_system' - AND nspname = 'public' - ) - """).scalar() - - if not func_exists: - return "ERROR: Функция update_poruchitelstva_system() не существует" - - # Проверяем существование функции rebuild_poruchitelstva_table - func_exists = conn.execute(""" - SELECT EXISTS( - SELECT 1 FROM pg_proc - JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid - WHERE proname = 'rebuild_poruchitelstva_table' - AND nspname = 'public' - ) - """).scalar() - - if not func_exists: - return "ERROR: Функция rebuild_poruchitelstva_table() не существует" - - # Проверяем существование таблицы poruchitelstva - table_exists = conn.execute(""" - SELECT EXISTS( - SELECT 1 FROM information_schema.tables - WHERE table_name = 'poruchitelstva' - AND table_schema = 'public' - ) - """).scalar() - - if not table_exists: - print("WARNING: Таблица poruchitelstva не существует, будет создана") - - except Exception as e: - return f"ERROR: Ошибка при проверке функций: {str(e)}" - - # Шаг 3: Обновляем систему поручительств + # Шаг 2: Обновляем структуру таблицы poruchitelstva try: with engine.begin() as conn: - # Сначала обновляем конфигурацию компаний + # Вызываем функцию обновления update_result = conn.execute( - "SELECT update_poruchitelstva_system()" + "SELECT update_companies_and_rebuild()" ).scalar() - if not update_result: - return "ERROR: Функция update_poruchitelstva_system() вернула пустой результат" + # Добавляем информацию о компаниях в результат + result_message = f"{update_result}\n\nАктивные компании: {companies_list}\nВсего активных компаний: {active_count}" - # Затем пересоздаем таблицу - conn.execute("SELECT rebuild_poruchitelstva_table()") - - # Проверяем результат - check_result = conn.execute(""" - SELECT - (SELECT COUNT(*) FROM poruchitelstva) as main_table_count, - (SELECT COUNT(DISTINCT column_name) - 7 - FROM information_schema.columns - WHERE table_name = 'poruchitelstva' - AND table_schema = 'public') as company_columns_count - """).fetchone() - - main_count, company_cols = check_result - - result_message = ( - f"SUCCESS: Система поручительств обновлена.\n" - f"• Активных компаний: {active_companies_count}\n" - f"• Пример компаний: {', '.join(companies_list[:3])}{'...' if len(companies_list) > 3 else ''}\n" - f"• Записей в основной таблице: {main_count or 0}\n" - f"• Колонок компаний в таблице: {company_cols or 0}" - ) - - return result_message + return f"SUCCESS: {result_message}" except Exception as e: - error_msg = str(e) + error_detail = str(e) - # Проверяем, есть ли конкретные ошибки - if "duplicate key" in error_msg.lower(): - return f"ERROR: Обнаружены дубликаты ключей. {error_msg}" - elif "unique constraint" in error_msg.lower(): - return f"ERROR: Нарушение уникальности. {error_msg}" - elif "ambiguous column" in error_msg.lower(): - return f"ERROR: Неоднозначное имя колонки. {error_msg}" - elif "function does not exist" in error_msg.lower(): - return f"ERROR: Отсутствует необходимая функция. {error_msg}" - else: - return f"ERROR: Ошибка при обновлении системы поручительств: {error_msg}" + # Дополнительная диагностика при ошибках + diagnostic_info = "" + try: + with engine.connect() as conn2: + # Проверяем существование таблицы poruchitelstva + por_table_exists = conn2.execute(""" + SELECT EXISTS( + SELECT 1 FROM information_schema.tables + WHERE table_name = 'poruchitelstva' + AND table_schema = 'public' + ) + """).scalar() + + if por_table_exists: + # Проверяем структуру таблицы + columns = conn2.execute(""" + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = 'poruchitelstva' + AND table_schema = 'public' + ORDER BY ordinal_position + """).fetchall() + + columns_list = "\n".join([f" - {row[0]} ({row[1]})" for row in columns]) + diagnostic_info = f"\n\nТекущая структура poruchitelstva:\n{columns_list}" + else: + diagnostic_info = "\n\nТаблица poruchitelstva не существует" + + except Exception as diag_error: + diagnostic_info = f"\n\nНе удалось получить диагностическую информацию: {str(diag_error)}" + + return f"ERROR: Ошибка при пересоздании таблицы: {error_detail}{diagnostic_info}" with DAG( dag_id='update_poruchitelstva', default_args=default_args, - description='Обновление списка погашений', + description='Обновление таблицы поручительств', schedule_interval=None, catchup=False, - tags=['sigma'], + tags=['sigma', 'poruchitelstva'], ) as dag: poruchitelstva_task = PythonOperator( - task_id='update_poruchitelstva_structure_with_detailed_errors', + task_id='update_poruchitelstva_structure', python_callable=update_poruchitelstva_structure_with_detailed_errors, provide_context=True )