diff --git a/dags/poruchitelstva.py b/dags/poruchitelstva.py new file mode 100644 index 0000000..d9e29cd --- /dev/null +++ b/dags/poruchitelstva.py @@ -0,0 +1,138 @@ +import requests +import json +import pandas as pd +import numpy as np +from datetime import datetime +from requests.auth import HTTPBasicAuth +from sqlalchemy import create_engine, text +from airflow import DAG +from airflow.operators.python import PythonOperator +import logging + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2023, 1, 1), + 'retries': 1, +} + +def get_db_engine(): + """Создает подключение к PostgreSQL""" + DF_CONFIG = { + 'dbname': "postgres", + 'user': "postgres", + 'password': "4a00d4b90cd830da0796", + 'host': "postgresql", + 'port': "5432" + } + return create_engine( + f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@" + f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}", + pool_size=10, + max_overflow=20 + ) + +def update_pogasheniya_structure_with_detailed_errors(**kwargs): + """ + Версия с подробными сообщениями об ошибках + """ + engine = get_db_engine() + + # Шаг 1: Проверяем существование функции get_years() + try: + with engine.connect() as conn: + # Проверяем, существует ли функция + func_exists = conn.execute(""" + SELECT EXISTS( + SELECT 1 FROM pg_proc + JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid + WHERE proname = 'get_years' + AND nspname = 'public' + ) + """).scalar() + + if not func_exists: + return "ERROR: Функция get_years() не существует в схеме public" + + # Проверяем существование таблицы years_for_pogasheniya + table_exists = conn.execute(""" + SELECT EXISTS( + SELECT 1 FROM information_schema.tables + WHERE table_name = 'years_for_pogasheniya' + AND table_schema = 'public' + ) + """).scalar() + + if not table_exists: + return "ERROR: Таблица years_for_pogasheniya не существует" + + # Проверяем, есть ли данные в таблице + has_data = conn.execute(""" + SELECT EXISTS(SELECT 1 FROM years_for_pogasheniya LIMIT 1) + """).scalar() + + if not has_data: + return "ERROR: Таблица years_for_pogasheniya пустая" + + # Вызываем функцию get_years() + result = conn.execute("SELECT * FROM get_years()").fetchone() + + if result is None: + return "ERROR: Функция get_years() вернула NULL. Проверьте данные в таблице" + + start_year, end_year = result + + # Детальная проверка значений + error_messages = [] + + if start_year is None: + error_messages.append("start_year is NULL") + if end_year is None: + error_messages.append("end_year is NULL") + + if error_messages: + return f"ERROR: {', '.join(error_messages)}" + + # Преобразуем в числа с проверкой + try: + start_year_int = int(start_year) + end_year_int = int(end_year) + except ValueError: + return f"ERROR: Невозможно преобразовать годы в числа: start_year='{start_year}', end_year='{end_year}'" + + if start_year_int > end_year_int: + return f"ERROR: Неправильный порядок лет: start_year ({start_year}) > end_year ({end_year})" + + except Exception as e: + return f"ERROR: Ошибка при проверке данных: {str(e)}" + + # Шаг 2: Обновляем структуру + try: + with engine.begin() as conn: + update_result = conn.execute( + "SELECT update_years_and_rebuild(%s, %s)", + (str(start_year), str(end_year)) + ).scalar() + + return f"SUCCESS: {update_result}" + + except Exception as e: + return f"ERROR: Ошибка при пересоздании таблицы: {str(e)}" + + +with DAG( + dag_id='update_pogasheniya', + default_args=default_args, + description='Обновление списка погашений', + schedule_interval=None, + catchup=False, + tags=['sigma'], +) as dag: + + pogasheniya_task = PythonOperator( + task_id='update_pogasheniya_structure_with_detailed_errors', + python_callable=update_pogasheniya_structure_with_detailed_errors, + provide_context=True + ) + +pogasheniya_task \ No newline at end of file