import requests import json import pandas as pd import numpy as np from datetime import datetime from requests.auth import HTTPBasicAuth from sqlalchemy import create_engine, text from airflow import DAG from airflow.operators.python import PythonOperator import logging default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, } def get_db_engine(): """Создает подключение к PostgreSQL""" DF_CONFIG = { 'dbname': "postgres", 'user': "postgres", 'password': "4a00d4b90cd830da0796", 'host': "postgresql", 'port': "5432" } return create_engine( f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", pool_size=10, max_overflow=20 ) def update_poruchitelstva_structure_with_detailed_errors(**kwargs): """ Версия с подробными сообщениями об ошибках для поручительств """ engine = get_db_engine() # Шаг 1: Проверяем данные в таблице sigma_gk try: with engine.connect() as conn: # Проверяем существование таблицы sigma_gk table_exists = conn.execute(""" SELECT EXISTS( SELECT 1 FROM information_schema.tables WHERE table_name = 'sigma_gk' AND table_schema = 'public' ) """).scalar() if not table_exists: return "ERROR: Таблица sigma_gk не существует" # Проверяем, есть ли данные в таблице sigma_gk has_data = conn.execute(""" SELECT EXISTS(SELECT 1 FROM sigma_gk LIMIT 1) """).scalar() if not has_data: return "ERROR: Таблица sigma_gk пустая" # Проверяем структуру таблицы sigma_gk columns_check = conn.execute(""" SELECT COUNT(CASE WHEN column_name = 'poruchitel_name' THEN 1 END) as has_poruchitel_name, COUNT(CASE WHEN column_name = 'is_active' THEN 1 END) as has_is_active FROM information_schema.columns WHERE table_name = 'sigma_gk' AND table_schema = 'public' """).fetchone() has_poruchitel_name, has_is_active = columns_check if not has_poruchitel_name: return "ERROR: В таблице sigma_gk отсутствует колонка 'poruchitel_name'" if not has_is_active: return "ERROR: В таблице sigma_gk отсутствует колонка 'is_active'" # Проверяем, есть ли активные компании active_companies_count = conn.execute(""" SELECT COUNT(DISTINCT poruchitel_name) FROM sigma_gk WHERE is_active = '1' AND poruchitel_name IS NOT NULL AND poruchitel_name != '' """).scalar() if active_companies_count == 0: # Проверяем, есть ли вообще данные в колонке poruchitel_name any_companies = conn.execute(""" SELECT COUNT(DISTINCT poruchitel_name) FROM sigma_gk WHERE poruchitel_name IS NOT NULL AND poruchitel_name != '' """).scalar() if any_companies == 0: return "ERROR: В таблице sigma_gk нет данных в колонке poruchitel_name" else: return "ERROR: В таблице sigma_gk нет активных компаний (is_active = '1')" # Получаем список активных компаний для информации active_companies = conn.execute(""" SELECT DISTINCT poruchitel_name FROM sigma_gk WHERE is_active = '1' AND poruchitel_name IS NOT NULL AND poruchitel_name != '' ORDER BY poruchitel_name LIMIT 10 """).fetchall() companies_list = [row[0] for row in active_companies] except Exception as e: return f"ERROR: Ошибка при проверке таблицы sigma_gk: {str(e)}" # Шаг 2: Проверяем существование необходимых функций try: with engine.connect() as conn: # Проверяем существование функции update_poruchitelstva_system func_exists = conn.execute(""" SELECT EXISTS( SELECT 1 FROM pg_proc JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid WHERE proname = 'update_poruchitelstva_system' AND nspname = 'public' ) """).scalar() if not func_exists: return "ERROR: Функция update_poruchitelstva_system() не существует" # Проверяем существование функции rebuild_poruchitelstva_table func_exists = conn.execute(""" SELECT EXISTS( SELECT 1 FROM pg_proc JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid WHERE proname = 'rebuild_poruchitelstva_table' AND nspname = 'public' ) """).scalar() if not func_exists: return "ERROR: Функция rebuild_poruchitelstva_table() не существует" # Проверяем существование таблицы poruchitelstva table_exists = conn.execute(""" SELECT EXISTS( SELECT 1 FROM information_schema.tables WHERE table_name = 'poruchitelstva' AND table_schema = 'public' ) """).scalar() if not table_exists: print("WARNING: Таблица poruchitelstva не существует, будет создана") except Exception as e: return f"ERROR: Ошибка при проверке функций: {str(e)}" # Шаг 3: Обновляем систему поручительств try: with engine.begin() as conn: # Сначала обновляем конфигурацию компаний update_result = conn.execute( "SELECT update_poruchitelstva_system()" ).scalar() if not update_result: return "ERROR: Функция update_poruchitelstva_system() вернула пустой результат" # Затем пересоздаем таблицу conn.execute("SELECT rebuild_poruchitelstva_table()") # Проверяем результат check_result = conn.execute(""" SELECT (SELECT COUNT(*) FROM poruchitelstva) as main_table_count, (SELECT COUNT(DISTINCT column_name) - 7 FROM information_schema.columns WHERE table_name = 'poruchitelstva' AND table_schema = 'public') as company_columns_count """).fetchone() main_count, company_cols = check_result result_message = ( f"SUCCESS: Система поручительств обновлена.\n" f"• Активных компаний: {active_companies_count}\n" f"• Пример компаний: {', '.join(companies_list[:3])}{'...' if len(companies_list) > 3 else ''}\n" f"• Записей в основной таблице: {main_count or 0}\n" f"• Колонок компаний в таблице: {company_cols or 0}" ) return result_message except Exception as e: error_msg = str(e) # Проверяем, есть ли конкретные ошибки if "duplicate key" in error_msg.lower(): return f"ERROR: Обнаружены дубликаты ключей. {error_msg}" elif "unique constraint" in error_msg.lower(): return f"ERROR: Нарушение уникальности. {error_msg}" elif "ambiguous column" in error_msg.lower(): return f"ERROR: Неоднозначное имя колонки. {error_msg}" elif "function does not exist" in error_msg.lower(): return f"ERROR: Отсутствует необходимая функция. {error_msg}" else: return f"ERROR: Ошибка при обновлении системы поручительств: {error_msg}" with DAG( dag_id='update_poruchitelstva', default_args=default_args, description='Обновление списка погашений', schedule_interval=None, catchup=False, tags=['sigma'], ) as dag: poruchitelstva_task = PythonOperator( task_id='update_poruchitelstva_structure_with_detailed_errors', python_callable=update_poruchitelstva_structure_with_detailed_errors, provide_context=True ) poruchitelstva_task