import requests import json import pandas as pd import numpy as np from datetime import datetime from requests.auth import HTTPBasicAuth from sqlalchemy import create_engine, text from airflow import DAG from airflow.operators.python import PythonOperator import logging default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 1, } def get_db_engine(): """Создает подключение к PostgreSQL""" DF_CONFIG = { 'dbname': "postgres", 'user': "postgres", 'password': "4a00d4b90cd830da0796", 'host': "postgresql", 'port': "5432" } return create_engine( f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", pool_size=10, max_overflow=20 ) def update_poruchitelstva_structure_with_detailed_errors(**kwargs): """ Версия с подробными сообщениями об ошибках для поручительств """ engine = get_db_engine() # Шаг 1: Проверяем существование необходимых объектов try: with engine.connect() as conn: # Проверяем существование функции get_active_companies() func_exists = conn.execute(""" SELECT EXISTS( SELECT 1 FROM pg_proc JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid WHERE proname = 'get_active_companies' AND nspname = 'public' ) """).scalar() if not func_exists: return "ERROR: Функция get_active_companies() не существует в схеме public" # Проверяем существование таблицы sigma_gk table_exists = conn.execute(""" SELECT EXISTS( SELECT 1 FROM information_schema.tables WHERE table_name = 'sigma_gk' AND table_schema = 'public' ) """).scalar() if not table_exists: return "ERROR: Таблица sigma_gk не существует" # Проверяем, есть ли активные компании active_count = conn.execute(""" SELECT COUNT(*) FROM sigma_gk WHERE is_active = '1' AND poruchitel_name IS NOT NULL AND poruchitel_name != '' """).scalar() if active_count == 0: # Получаем список всех компаний для отладки all_companies = conn.execute(""" SELECT name, inn, is_active, poruchitel_name FROM sigma_gk LIMIT 10 """).fetchall() companies_info = "\n".join([ f"{row[0]} (ИНН: {row[1]}, active: {row[2]}, poruchitel: {row[3]})" for row in all_companies ]) return f"ERROR: Нет активных компаний в sigma_gk\n\nДоступные компании:\n{companies_info}" # Получаем список активных компаний для информации active_companies = conn.execute(""" SELECT poruchitel_name FROM sigma_gk WHERE is_active = '1' AND poruchitel_name IS NOT NULL AND poruchitel_name != '' ORDER BY poruchitel_name """).fetchall() companies_list = ", ".join([row[0] for row in active_companies]) except Exception as e: return f"ERROR: Ошибка при проверке данных: {str(e)}" # Шаг 2: Обновляем структуру таблицы poruchitelstva try: with engine.begin() as conn: # Вызываем функцию обновления update_result = conn.execute( "SELECT update_companies_and_rebuild()" ).scalar() # Добавляем информацию о компаниях в результат result_message = f"{update_result}\n\nАктивные компании: {companies_list}\nВсего активных компаний: {active_count}" return f"SUCCESS: {result_message}" except Exception as e: error_detail = str(e) # Дополнительная диагностика при ошибках diagnostic_info = "" try: with engine.connect() as conn2: # Проверяем существование таблицы poruchitelstva por_table_exists = conn2.execute(""" SELECT EXISTS( SELECT 1 FROM information_schema.tables WHERE table_name = 'poruchitelstva' AND table_schema = 'public' ) """).scalar() if por_table_exists: # Проверяем структуру таблицы columns = conn2.execute(""" SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'poruchitelstva' AND table_schema = 'public' ORDER BY ordinal_position """).fetchall() columns_list = "\n".join([f" - {row[0]} ({row[1]})" for row in columns]) diagnostic_info = f"\n\nТекущая структура poruchitelstva:\n{columns_list}" else: diagnostic_info = "\n\nТаблица poruchitelstva не существует" except Exception as diag_error: diagnostic_info = f"\n\nНе удалось получить диагностическую информацию: {str(diag_error)}" return f"ERROR: Ошибка при пересоздании таблицы: {error_detail}{diagnostic_info}" with DAG( dag_id='update_poruchitelstva', default_args=default_args, description='Обновление таблицы поручительств', schedule_interval=None, catchup=False, tags=['sigma', 'poruchitelstva'], ) as dag: poruchitelstva_task = PythonOperator( task_id='update_poruchitelstva_structure', python_callable=update_poruchitelstva_structure_with_detailed_errors, provide_context=True ) poruchitelstva_task