Удалить dags/update_dicts.py

This commit is contained in:
bn_user 2025-11-13 15:57:23 +00:00
parent 0bc5923ab8
commit 893e36809e
1 changed files with 0 additions and 258 deletions

View File

@ -1,258 +0,0 @@
import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
def take_dogovor(**kwargs):
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
query = """ВЫБРАТЬ РАЗЛИЧНЫЕ
ОстаткиОбороты.Субконто2.Наименование КАК Наименование,
ОстаткиОбороты.Субконто2.Номер КАК Номер,
ОстаткиОбороты.Субконто2.Дата КАК Дата,
ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействия,
UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора
ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты
ГДЕ
ОстаткиОбороты.Счет.Код В (&СписокСчетов)
И ОстаткиОбороты.Субконто2 ЕСТЬ НЕ NULL
"""
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
response = requests.post(
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
json={"query":query, "params": params},
auth=auth,
verify=False
)
data_from_1c = response.json()
df = pd.DataFrame(data_from_1c['data'])
engine = get_db_engine()
table_name = 'dict_dogovor'
field_mapping = {
'Наименование': 'name',
'Номер': 'nomer',
'Дата': 'date_begin',
'СрокДействия': 'date_end',
'ИдентификаторДоговора': 'uid_dogovor'
}
df = df.rename(columns=field_mapping)
with engine.begin() as conn:
if not df.empty:
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
df.to_sql(
f'temp_{table_name}',
con=conn,
if_exists='append',
index=False,
method='multi'
)
conn.execute(f"DELETE FROM public.{table_name}")
conn.execute(f"""
INSERT INTO public.{table_name} (name, nomer, date_begin, date_end, uid_dogovor)
SELECT
name,
nomer,
date_begin,
date_end,
uid_dogovor
FROM temp_{table_name}
ON CONFLICT (uid_dogovor)
DO UPDATE SET
name = EXCLUDED.name,
nomer = EXCLUDED.nomer,
date_begin = EXCLUDED.date_begin,
date_end = EXCLUDED.date_end
WHERE
public.{table_name}.name IS DISTINCT FROM EXCLUDED.name
OR public.{table_name}.nomer IS DISTINCT FROM EXCLUDED.nomer
OR public.{table_name}.date_begin IS DISTINCT FROM EXCLUDED.date_begin
OR public.{table_name}.date_end IS DISTINCT FROM EXCLUDED.date_end;
"""
)
def take_bank(**kwargs):
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
query = """ВЫБРАТЬ РАЗЛИЧНЫЕ
ОстаткиОбороты.Субконто1.Наименование КАК Наименование,
ОстаткиОбороты.Субконто1.ИНН КАК ИНН,
ОстаткиОбороты.Субконто1.КПП КАК КПП,
UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента
ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты
ГДЕ
ОстаткиОбороты.Счет.Код В (&СписокСчетов)
И ОстаткиОбороты.Субконто1 ЕСТЬ НЕ NULL
"""
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
response = requests.post(
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
json={"query":query, "params": params},
auth=auth,
verify=False
)
data_from_1c = response.json()
df = pd.DataFrame(data_from_1c['data'])
engine = get_db_engine()
table_name = 'dict_bank'
field_mapping = {
'Наименование': 'name',
'ИНН': 'inn',
'КПП': 'kpp',
'ИдентификаторКонтрагента': 'uid_bank'
}
df = df.rename(columns=field_mapping)
with engine.begin() as conn:
if not df.empty:
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
df.to_sql(
f'temp_{table_name}',
con=conn,
if_exists='append',
index=False,
method='multi'
)
conn.execute(f"DELETE FROM public.{table_name}")
conn.execute(f"""
INSERT INTO public.{table_name} (name, inn, kpp, uid_bank)
SELECT
name,
inn,
kpp,
uid_bank
FROM temp_{table_name}
ON CONFLICT (uid_bank)
DO UPDATE SET
name = EXCLUDED.name,
inn = EXCLUDED.inn,
kpp = EXCLUDED.kpp
WHERE
public.{table_name}.name IS DISTINCT FROM EXCLUDED.name
OR public.{table_name}.inn IS DISTINCT FROM EXCLUDED.inn
OR public.{table_name}.kpp IS DISTINCT FROM EXCLUDED.kpp
"""
)
def take_organizaciya(**kwargs):
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
query = """ВЫБРАТЬ РАЗЛИЧНЫЕ
ОстаткиОбороты.Организация.Наименование КАК Наименование,
ОстаткиОбороты.Организация.ИНН КАК ИНН,
ОстаткиОбороты.Организация.КПП КАК КПП,
UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторОрганизации
ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты
ГДЕ
ОстаткиОбороты.Счет.Код В (&СписокСчетов)
"""
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
response = requests.post(
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
json={"query":query, "params": params},
auth=auth,
verify=False
)
data_from_1c = response.json()
df = pd.DataFrame(data_from_1c['data'])
engine = get_db_engine()
table_name = 'dict_organizaciya'
field_mapping = {
'Наименование': 'name',
'ИНН': 'inn',
'КПП': 'kpp',
'ИдентификаторОрганизации': 'uid_organizaciya'
}
df = df.rename(columns=field_mapping)
with engine.begin() as conn:
if not df.empty:
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
df.to_sql(
f'temp_{table_name}',
con=conn,
if_exists='append',
index=False,
method='multi'
)
conn.execute(f"DELETE FROM public.{table_name}")
conn.execute(f"""
INSERT INTO public.{table_name} (name, inn, kpp, uid_organizaciya)
SELECT
name,
inn,
kpp,
uid_organizaciya
FROM temp_{table_name}
ON CONFLICT (uid_organizaciya)
DO UPDATE SET
name = EXCLUDED.name,
inn = EXCLUDED.inn,
kpp = EXCLUDED.kpp
WHERE
public.{table_name}.name IS DISTINCT FROM EXCLUDED.name
OR public.{table_name}.inn IS DISTINCT FROM EXCLUDED.inn
OR public.{table_name}.kpp IS DISTINCT FROM EXCLUDED.kpp
"""
)
with DAG(
dag_id='download_dicts_from_1C_source',
default_args=default_args,
description='Выгрузка данных из 1С',
schedule_interval=None, #"0,30 01-10 * * *",
catchup=False,
tags=['sigma'],
) as dag:
take_dogovor_task = PythonOperator(
task_id="take_dogovor",
python_callable=take_dogovor,
provide_context=True
)
take_bank_task = PythonOperator(
task_id="take_bank",
python_callable=take_bank,
provide_context=True
)
take_organizaciya_task = PythonOperator(
task_id="take_organizaciya",
python_callable=take_organizaciya,
provide_context=True
)
[take_dogovor, take_bank_task, take_organizaciya_task]