Обновить dags/OSV.py

This commit is contained in:
bn_user 2025-09-05 12:57:43 +00:00
parent 2d9695eff3
commit b8530141cb
1 changed files with 58 additions and 87 deletions

View File

@ -71,96 +71,67 @@ def read_data_1C(**kwargs):
data_from_1c = response.json() data_from_1c = response.json()
df = pd.DataFrame(data_from_1c['data']) df = pd.DataFrame(data_from_1c['data'])
# engine = get_db_engine() engine = get_db_engine()
# table_name = 'oborotno_salbdovaya_vedomostb' table_name = 'oborotno_salbdovaya_vedomostb'
# field_mapping = { field_mapping = {
# # Основные поля # Основные поля
# 'Счет': 'schet', 'Счет': 'schet',
# 'Субконто1': 'subkonto1', 'Субконто1': 'subkonto1',
# 'Субконто2': 'subkonto2', 'Субконто2': 'subkonto2',
# # 'Субконто3': 'subkonto3', 'Организация': 'organizaciya',
# 'Организация': 'organizaciya', 'Контрагент': 'kontagent',
# # 'Валюта': 'valyuta', 'НомерДоговора': 'nomer',
'ДатаДоговора': 'date_bedin',
'СрокДействияДоговора': 'date_end',
# # Суммовые остатки и обороты # Суммовые остатки и обороты
# # 'СуммаНачальныйОстаток': 'summa_nachalnyy_ostatok', 'СуммаОборот': 'summa_oborot',
# # 'СуммаНачальныйОстатокДт': 'summa_nachalnyy_ostatok_dt', 'СуммаОборотДт': 'summa_oborot_dt',
# # 'СуммаНачальныйОстатокКт': 'summa_nachalnyy_ostatok_kt', 'СуммаОборотКт': 'summa_oborot_kt',
# # 'СуммаНачальныйРазвернутыйОстатокДт': 'summa_nachalnyy_razvernutyy_ostatok_dt', 'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok',
# # 'СуммаНачальныйРазвернутыйОстатокКт': 'summa_nachalnyy_razvernutyy_ostatok_kt', 'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt',
# 'СуммаОборот': 'summa_oborot', 'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt',
# 'СуммаОборотДт': 'summa_oborot_dt', 'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt',
# 'СуммаОборотКт': 'summa_oborot_kt', 'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt'
# 'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok', }
# 'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt',
# 'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt',
# 'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt',
# 'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt'
# # Валютные остатки и обороты df = df.rename(columns=field_mapping)
# # 'ВалютнаяСуммаНачальныйОстаток': 'valyutnaya_summa_nachalnyy_ostatok', with engine.begin() as conn:
# # 'ВалютнаяСуммаНачальныйОстатокДт': 'valyutnaya_summa_nachalnyy_ostatok_dt', if not df.empty:
# # 'ВалютнаяСуммаНачальныйОстатокКт': 'valyutnaya_summa_nachalnyy_ostatok_kt', conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
# # 'ВалютнаяСуммаНачальныйРазвернутыйОстатокДт': 'valyutnaya_summa_nachalnyy_razvernutyy_ostatok_dt', df.to_sql(
# # 'ВалютнаяСуммаНачальныйРазвернутыйОстатокКт': 'valyutnaya_summa_nachalnyy_razvernutyy_ostatok_kt', f'temp_{table_name}',
# # 'ВалютнаяСуммаОборот': 'valyutnaya_summa_oborot', con=conn,
# # 'ВалютнаяСуммаОборотДт': 'valyutnaya_summa_oborot_dt', if_exists='append',
# # 'ВалютнаяСуммаОборотКт': 'valyutnaya_summa_oborot_kt', index=False,
# # 'ВалютнаяСуммаКонечныйОстаток': 'valyutnaya_summa_konechnyy_ostatok', method='multi'
# # 'ВалютнаяСуммаКонечныйОстатокДт': 'valyutnaya_summa_konechnyy_ostatok_dt', )
# # 'ВалютнаяСуммаКонечныйОстатокКт': 'valyutnaya_summa_konechnyy_ostatok_kt', conn.execute(f"DELETE FROM public.{table_name} where get_date = CURRENT_DATE::date::timestamp")
# # 'ВалютнаяСуммаКонечныйРазвернутыйОстатокДт': 'valyutnaya_summa_konechnyy_razvernutyy_ostatok_dt', conn.execute(f"""
# # 'ВалютнаяСуммаКонечныйРазвернутыйОстатокКт': 'valyutnaya_summa_konechnyy_razvernutyy_ostatok_kt', INSERT INTO public.{table_name}
SELECT
# # Количественные остатки и обороты schet
# # 'КоличествоНачальныйОстаток': 'kolichestvo_nachalnyy_ostatok', , subkonto1
# # 'КоличествоНачальныйОстатокДт': 'kolichestvo_nachalnyy_ostatok_dt', , subkonto2
# # 'КоличествоНачальныйОстатокКт': 'kolichestvo_nachalnyy_ostatok_kt', , organizaciya
# # 'КоличествоНачальныйРазвернутыйОстатокДт': 'kolichestvo_nachalnyy_razvernutyy_ostatok_dt', , kontagent
# # 'КоличествоНачальныйРазвернутыйОстатокКт': 'kolichestvo_nachalnyy_razvernutyy_ostatok_kt', , nomer
# # 'КоличествоОборот': 'kolichestvo_oborot', , date_bedin
# # 'КоличествоОборотДт': 'kolichestvo_oborot_dt', , date_end
# # 'КоличествоОборотКт': 'kolichestvo_oborot_kt', , summa_oborot
# # 'КоличествоКонечныйОстаток': 'kolichestvo_konechnyy_ostatok', , summa_oborot_dt
# # 'КоличествоКонечныйОстатокДт': 'kolichestvo_konechnyy_ostatok_dt', , summa_oborot_kt
# # 'КоличествоКонечныйОстатокКт': 'kolichestvo_konechnyy_ostatok_kt', , summa_konechnyy_ostatok
# # 'КоличествоКонечныйРазвернутыйОстатокДт': 'kolichestvo_konechnyy_razvernutyy_ostatok_dt', , summa_konechnyy_ostatok_dt
# # 'КоличествоКонечныйРазвернутыйОстатокКт': 'kolichestvo_konechnyy_razvernutyy_ostatok_kt' , summa_konechnyy_ostatok_kt
# } , summa_konechnyy_razvernutyy_ostatok_dt
, summa_konechnyy_razvernutyy_ostatok_kt
# df = df.rename(columns=field_mapping) , CURRENT_DATE::date::timestamp
# with engine.begin() as conn: FROM temp_{table_name}
# if not df.empty: --ON CONFLICT (schet, subkonto1, subkonto2, organizaciya)
# conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0") """
# df.to_sql( )
# f'temp_{table_name}',
# con=conn,
# if_exists='append',
# index=False,
# method='multi'
# )
# conn.execute(f"DELETE FROM public.{table_name} where get_date = CURRENT_DATE::date::timestamp")
# conn.execute(f"""
# INSERT INTO public.{table_name}
# SELECT
# schet
# , subkonto1
# , subkonto2
# , organizaciya
# , summa_oborot
# , summa_oborot_dt
# , summa_oborot_kt
# , summa_konechnyy_ostatok
# , summa_konechnyy_ostatok_dt
# , summa_konechnyy_ostatok_kt
# , summa_konechnyy_razvernutyy_ostatok_dt
# , summa_konechnyy_razvernutyy_ostatok_kt
# , CURRENT_DATE::date::timestamp
# FROM temp_{table_name}
# --ON CONFLICT (schet, subkonto1, subkonto2, organizaciya)
# """
# )
with DAG( with DAG(
dag_id='data_download_from_1C_source', dag_id='data_download_from_1C_source',