sigma/dags/poruchitelstva.py

138 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine, text
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
def update_pogasheniya_structure_with_detailed_errors(**kwargs):
"""
Версия с подробными сообщениями об ошибках
"""
engine = get_db_engine()
# Шаг 1: Проверяем существование функции get_years()
try:
with engine.connect() as conn:
# Проверяем, существует ли функция
func_exists = conn.execute("""
SELECT EXISTS(
SELECT 1 FROM pg_proc
JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid
WHERE proname = 'get_years'
AND nspname = 'public'
)
""").scalar()
if not func_exists:
return "ERROR: Функция get_years() не существует в схеме public"
# Проверяем существование таблицы years_for_pogasheniya
table_exists = conn.execute("""
SELECT EXISTS(
SELECT 1 FROM information_schema.tables
WHERE table_name = 'years_for_pogasheniya'
AND table_schema = 'public'
)
""").scalar()
if not table_exists:
return "ERROR: Таблица years_for_pogasheniya не существует"
# Проверяем, есть ли данные в таблице
has_data = conn.execute("""
SELECT EXISTS(SELECT 1 FROM years_for_pogasheniya LIMIT 1)
""").scalar()
if not has_data:
return "ERROR: Таблица years_for_pogasheniya пустая"
# Вызываем функцию get_years()
result = conn.execute("SELECT * FROM get_years()").fetchone()
if result is None:
return "ERROR: Функция get_years() вернула NULL. Проверьте данные в таблице"
start_year, end_year = result
# Детальная проверка значений
error_messages = []
if start_year is None:
error_messages.append("start_year is NULL")
if end_year is None:
error_messages.append("end_year is NULL")
if error_messages:
return f"ERROR: {', '.join(error_messages)}"
# Преобразуем в числа с проверкой
try:
start_year_int = int(start_year)
end_year_int = int(end_year)
except ValueError:
return f"ERROR: Невозможно преобразовать годы в числа: start_year='{start_year}', end_year='{end_year}'"
if start_year_int > end_year_int:
return f"ERROR: Неправильный порядок лет: start_year ({start_year}) > end_year ({end_year})"
except Exception as e:
return f"ERROR: Ошибка при проверке данных: {str(e)}"
# Шаг 2: Обновляем структуру
try:
with engine.begin() as conn:
update_result = conn.execute(
"SELECT update_years_and_rebuild(%s, %s)",
(str(start_year), str(end_year))
).scalar()
return f"SUCCESS: {update_result}"
except Exception as e:
return f"ERROR: Ошибка при пересоздании таблицы: {str(e)}"
with DAG(
dag_id='update_pogasheniya',
default_args=default_args,
description='Обновление списка погашений',
schedule_interval=None,
catchup=False,
tags=['sigma'],
) as dag:
pogasheniya_task = PythonOperator(
task_id='update_pogasheniya_structure_with_detailed_errors',
python_callable=update_pogasheniya_structure_with_detailed_errors,
provide_context=True
)
pogasheniya_task