sigma/dags/poruchitelstva.py

236 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine, text
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
def update_poruchitelstva_structure_with_detailed_errors(**kwargs):
"""
Версия с подробными сообщениями об ошибках для поручительств
"""
engine = get_db_engine()
# Шаг 1: Проверяем данные в таблице sigma_gk
try:
with engine.connect() as conn:
# Проверяем существование таблицы sigma_gk
table_exists = conn.execute("""
SELECT EXISTS(
SELECT 1 FROM information_schema.tables
WHERE table_name = 'sigma_gk'
AND table_schema = 'public'
)
""").scalar()
if not table_exists:
return "ERROR: Таблица sigma_gk не существует"
# Проверяем, есть ли данные в таблице sigma_gk
has_data = conn.execute("""
SELECT EXISTS(SELECT 1 FROM sigma_gk LIMIT 1)
""").scalar()
if not has_data:
return "ERROR: Таблица sigma_gk пустая"
# Проверяем структуру таблицы sigma_gk
columns_check = conn.execute("""
SELECT
COUNT(CASE WHEN column_name = 'poruchitel_name' THEN 1 END) as has_poruchitel_name,
COUNT(CASE WHEN column_name = 'is_active' THEN 1 END) as has_is_active
FROM information_schema.columns
WHERE table_name = 'sigma_gk'
AND table_schema = 'public'
""").fetchone()
has_poruchitel_name, has_is_active = columns_check
if not has_poruchitel_name:
return "ERROR: В таблице sigma_gk отсутствует колонка 'poruchitel_name'"
if not has_is_active:
return "ERROR: В таблице sigma_gk отсутствует колонка 'is_active'"
# Проверяем, есть ли активные компании
active_companies_count = conn.execute("""
SELECT COUNT(DISTINCT poruchitel_name)
FROM sigma_gk
WHERE is_active = '1'
AND poruchitel_name IS NOT NULL
AND poruchitel_name != ''
""").scalar()
if active_companies_count == 0:
# Проверяем, есть ли вообще данные в колонке poruchitel_name
any_companies = conn.execute("""
SELECT COUNT(DISTINCT poruchitel_name)
FROM sigma_gk
WHERE poruchitel_name IS NOT NULL
AND poruchitel_name != ''
""").scalar()
if any_companies == 0:
return "ERROR: В таблице sigma_gk нет данных в колонке poruchitel_name"
else:
return "ERROR: В таблице sigma_gk нет активных компаний (is_active = '1')"
# Получаем список активных компаний для информации
active_companies = conn.execute("""
SELECT DISTINCT poruchitel_name
FROM sigma_gk
WHERE is_active = '1'
AND poruchitel_name IS NOT NULL
AND poruchitel_name != ''
ORDER BY poruchitel_name
LIMIT 10
""").fetchall()
companies_list = [row[0] for row in active_companies]
except Exception as e:
return f"ERROR: Ошибка при проверке таблицы sigma_gk: {str(e)}"
# Шаг 2: Проверяем существование необходимых функций
try:
with engine.connect() as conn:
# Проверяем существование функции update_poruchitelstva_system
func_exists = conn.execute("""
SELECT EXISTS(
SELECT 1 FROM pg_proc
JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid
WHERE proname = 'update_poruchitelstva_system'
AND nspname = 'public'
)
""").scalar()
if not func_exists:
return "ERROR: Функция update_poruchitelstva_system() не существует"
# Проверяем существование функции rebuild_poruchitelstva_table
func_exists = conn.execute("""
SELECT EXISTS(
SELECT 1 FROM pg_proc
JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid
WHERE proname = 'rebuild_poruchitelstva_table'
AND nspname = 'public'
)
""").scalar()
if not func_exists:
return "ERROR: Функция rebuild_poruchitelstva_table() не существует"
# Проверяем существование таблицы poruchitelstva
table_exists = conn.execute("""
SELECT EXISTS(
SELECT 1 FROM information_schema.tables
WHERE table_name = 'poruchitelstva'
AND table_schema = 'public'
)
""").scalar()
if not table_exists:
print("WARNING: Таблица poruchitelstva не существует, будет создана")
except Exception as e:
return f"ERROR: Ошибка при проверке функций: {str(e)}"
# Шаг 3: Обновляем систему поручительств
try:
with engine.begin() as conn:
# Сначала обновляем конфигурацию компаний
update_result = conn.execute(
"SELECT update_poruchitelstva_system()"
).scalar()
if not update_result:
return "ERROR: Функция update_poruchitelstva_system() вернула пустой результат"
# Затем пересоздаем таблицу
conn.execute("SELECT rebuild_poruchitelstva_table()")
# Проверяем результат
check_result = conn.execute("""
SELECT
(SELECT COUNT(*) FROM poruchitelstva) as main_table_count,
(SELECT COUNT(*) FROM poruchitelstva_normalized) as normalized_count,
(SELECT COUNT(DISTINCT column_name) - 7
FROM information_schema.columns
WHERE table_name = 'poruchitelstva'
AND table_schema = 'public') as company_columns_count
""").fetchone()
main_count, normalized_count, company_cols = check_result
result_message = (
f"SUCCESS: Система поручительств обновлена.\n"
f"• Активных компаний: {active_companies_count}\n"
f"• Пример компаний: {', '.join(companies_list[:3])}{'...' if len(companies_list) > 3 else ''}\n"
f"• Записей в основной таблице: {main_count or 0}\n"
f"• Записей в нормализованной таблице: {normalized_count or 0}\n"
f"• Колонок компаний в таблице: {company_cols or 0}"
)
if normalized_count == 0:
result_message += "\nWARNING: Нормализованная таблица пуста. Возможно, нет данных о поручительствах."
return result_message
except Exception as e:
error_msg = str(e)
# Проверяем, есть ли конкретные ошибки
if "duplicate key" in error_msg.lower():
return f"ERROR: Обнаружены дубликаты ключей. {error_msg}"
elif "unique constraint" in error_msg.lower():
return f"ERROR: Нарушение уникальности. {error_msg}"
elif "ambiguous column" in error_msg.lower():
return f"ERROR: Неоднозначное имя колонки. {error_msg}"
elif "function does not exist" in error_msg.lower():
return f"ERROR: Отсутствует необходимая функция. {error_msg}"
else:
return f"ERROR: Ошибка при обновлении системы поручительств: {error_msg}"
with DAG(
dag_id='update_poruchitelstva',
default_args=default_args,
description='Обновление списка погашений',
schedule_interval=None,
catchup=False,
tags=['sigma'],
) as dag:
poruchitelstva_task = PythonOperator(
task_id='update_poruchitelstva_structure_with_detailed_errors',
python_callable=update_poruchitelstva_structure_with_detailed_errors,
provide_context=True
)
poruchitelstva_task