sigma/dags/upload_dicts.py

252 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
def take_dogovor(**kwargs):
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
query = """ВЫБРАТЬ РАЗЛИЧНЫЕ
ОстаткиОбороты.Субконто2.Наименование КАК Наименование,
ОстаткиОбороты.Субконто2.Номер КАК Номер,
ОстаткиОбороты.Субконто2.Дата КАК Дата,
ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействия,
UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора
ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты
ГДЕ
ОстаткиОбороты.Счет.Код В (&СписокСчетов)
И ОстаткиОбороты.Субконто2 ЕСТЬ НЕ NULL
"""
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
response = requests.post(
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
json={"query":query, "params": params},
auth=auth,
verify=False
)
data_from_1c = response.json()
df = pd.DataFrame(data_from_1c['data'])
engine = get_db_engine()
table_name = 'dict_dogovor'
field_mapping = {
'Наименование': 'name',
'Номер': 'nomer',
'Дата': 'date_begin',
'СрокДействия': 'date_end',
'ИдентификаторДоговора': 'uid_dogovor'
}
df = df.rename(columns=field_mapping)
with engine.begin() as conn:
if not df.empty:
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
df.to_sql(
f'temp_{table_name}',
con=conn,
if_exists='append',
index=False,
method='multi'
)
# conn.execute(f"DELETE FROM public.{table_name}")
conn.execute(f"""
INSERT INTO public.{table_name} (id, name, nomer, date_begin, date_end, uid_dogovor)
SELECT
row_number() over (order by uid_dogovor) id,
name,
nomer,
date_begin,
date_end,
uid_dogovor
FROM temp_{table_name}
ON CONFLICT (uid_dogovor)
DO UPDATE SET
id = EXCLUDED.id,
name = EXCLUDED.name,
nomer = EXCLUDED.nomer,
date_begin = EXCLUDED.date_begin,
date_end = EXCLUDED.date_end;
"""
)
def take_bank(**kwargs):
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
query = """ВЫБРАТЬ РАЗЛИЧНЫЕ
ОстаткиОбороты.Субконто1.Наименование КАК Наименование,
ОстаткиОбороты.Субконто1.ИНН КАК ИНН,
ОстаткиОбороты.Субконто1.КПП КАК КПП,
UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента
ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты
ГДЕ
ОстаткиОбороты.Счет.Код В (&СписокСчетов)
И ОстаткиОбороты.Субконто1 ЕСТЬ НЕ NULL
"""
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
response = requests.post(
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
json={"query":query, "params": params},
auth=auth,
verify=False
)
data_from_1c = response.json()
df = pd.DataFrame(data_from_1c['data'])
engine = get_db_engine()
table_name = 'dict_bank'
field_mapping = {
'Наименование': 'name',
'ИНН': 'inn',
'КПП': 'kpp',
'ИдентификаторКонтрагента': 'uid_bank'
}
df = df.rename(columns=field_mapping)
with engine.begin() as conn:
if not df.empty:
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
df.to_sql(
f'temp_{table_name}',
con=conn,
if_exists='append',
index=False,
method='multi'
)
# conn.execute(f"DELETE FROM public.{table_name}")
conn.execute(f"""
INSERT INTO public.{table_name} (id, name, inn, kpp, uid_bank)
SELECT
row_number() over (order by uid_bank) id,
name,
inn,
kpp,
uid_bank
FROM temp_{table_name}
ON CONFLICT (uid_bank)
DO UPDATE SET
id = EXCLUDED.id,
name = EXCLUDED.name,
inn = EXCLUDED.inn,
kpp = EXCLUDED.kpp;
"""
)
def take_organizaciya(**kwargs):
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
query = """ВЫБРАТЬ РАЗЛИЧНЫЕ
ОстаткиОбороты.Организация.Наименование КАК Наименование,
ОстаткиОбороты.Организация.ИНН КАК ИНН,
ОстаткиОбороты.Организация.КПП КАК КПП,
UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторОрганизации
ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты
ГДЕ
ОстаткиОбороты.Счет.Код В (&СписокСчетов)
"""
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
response = requests.post(
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
json={"query":query, "params": params},
auth=auth,
verify=False
)
data_from_1c = response.json()
df = pd.DataFrame(data_from_1c['data'])
engine = get_db_engine()
table_name = 'dict_organizaciya'
field_mapping = {
'Наименование': 'name',
'ИНН': 'inn',
'КПП': 'kpp',
'ИдентификаторОрганизации': 'uid_organizaciya'
}
df = df.rename(columns=field_mapping)
with engine.begin() as conn:
if not df.empty:
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
df.to_sql(
f'temp_{table_name}',
con=conn,
if_exists='append',
index=False,
method='multi'
)
# conn.execute(f"DELETE FROM public.{table_name}")
conn.execute(f"""
INSERT INTO public.{table_name} (id, name, inn, kpp, uid_organizaciya)
SELECT
row_number() over (order by uid_organizaciya) id,
name,
inn,
kpp,
uid_organizaciya
FROM temp_{table_name}
ON CONFLICT (uid_organizaciya)
DO UPDATE SET
id = EXCLUDED.id,
name = EXCLUDED.name,
inn = EXCLUDED.inn,
kpp = EXCLUDED.kpp;
"""
)
with DAG(
dag_id='download_dicts_from_1C_source',
default_args=default_args,
description='Выгрузка данных из 1С',
schedule_interval=None, #"0,30 01-10 * * *",
catchup=False,
tags=['sigma'],
) as dag:
take_dogovor_task = PythonOperator(
task_id="take_dogovor",
python_callable=take_dogovor,
provide_context=True
)
take_bank_task = PythonOperator(
task_id="take_bank",
python_callable=take_bank,
provide_context=True
)
take_organizaciya_task = PythonOperator(
task_id="take_organizaciya",
python_callable=take_organizaciya,
provide_context=True
)
[take_dogovor, take_bank_task, take_organizaciya_task]