From ec842fe08855c886404325db0063547cea4bf9fb Mon Sep 17 00:00:00 2001 From: bn_user Date: Tue, 18 Nov 2025 17:09:06 +0000 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20dags/OSV.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags/OSV.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/dags/OSV.py b/dags/OSV.py index bc7cc75..066e8f3 100644 --- a/dags/OSV.py +++ b/dags/OSV.py @@ -134,6 +134,32 @@ def read_data_1C(**kwargs): } df = df.rename(columns=field_mapping) + # Удаляем дубликаты по ключевым полям перед вставкой + conflict_columns = ['schet', 'uid_subkonto2', 'uid_subkonto1', 'uid_organizaciya', 'get_date'] + + print(f" Данные перед удалением дубликатов (первые 10 строк):") + print(f" Всего строк: {len(df)}") + print(f" Ключевые колонки для проверки дубликатов: {conflict_columns}") + + # Выводим первые 10 строк с ключевыми полями + if not df.empty: + display_columns = conflict_columns + ['summa_oborot', 'nomer'] # Добавляем еще пару полей для информации + available_columns = [col for col in display_columns if col in df.columns] + + print(f" Первые 10 строк (только ключевые поля):") + print(df[available_columns].head(10).to_string(index=False)) + + # Проверяем наличие дубликатов + duplicates = df.duplicated(subset=conflict_columns, keep=False) + if duplicates.any(): + duplicate_count = duplicates.sum() + print(f" Найдено дубликатов: {duplicate_count}") + print(f" Пример дублирующихся строк:") + duplicate_samples = df[duplicates][available_columns].head(5) + print(duplicate_samples.to_string(index=False)) + else: + print(f" Дубликатов не найдено") + with engine.begin() as conn: if not df.empty: conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}")