Добавить dags/upload_dicts.py
This commit is contained in:
parent
b97acac32c
commit
ffa6faa15b
|
|
@ -0,0 +1,229 @@
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import pandas as pd
|
||||||
|
import numpy as np
|
||||||
|
from datetime import datetime
|
||||||
|
from requests.auth import HTTPBasicAuth
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
from airflow import DAG
|
||||||
|
from airflow.operators.python import PythonOperator
|
||||||
|
|
||||||
|
default_args = {
|
||||||
|
'owner': 'airflow',
|
||||||
|
'depends_on_past': False,
|
||||||
|
'start_date': datetime(2023, 1, 1),
|
||||||
|
'retries': 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_db_engine():
|
||||||
|
"""Создает подключение к PostgreSQL"""
|
||||||
|
DF_CONFIG = {
|
||||||
|
'dbname': "postgres",
|
||||||
|
'user': "postgres",
|
||||||
|
'password': "4a00d4b90cd830da0796",
|
||||||
|
'host': "postgresql",
|
||||||
|
'port': "5432"
|
||||||
|
}
|
||||||
|
return create_engine(
|
||||||
|
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
|
||||||
|
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
|
||||||
|
pool_size=10,
|
||||||
|
max_overflow=20
|
||||||
|
)
|
||||||
|
|
||||||
|
def take_dogovor(**kwargs):
|
||||||
|
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
|
||||||
|
query = """ВЫБРАТЬ РАЗЛИЧНЫЕ
|
||||||
|
ОстаткиОбороты.Субконто2.Наименование КАК Наименование,
|
||||||
|
ОстаткиОбороты.Субконто2.Номер КАК Номер,
|
||||||
|
ОстаткиОбороты.Субконто2.Дата КАК Дата,
|
||||||
|
ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействия,
|
||||||
|
UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора
|
||||||
|
ИЗ
|
||||||
|
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты
|
||||||
|
ГДЕ
|
||||||
|
ОстаткиОбороты.Счет.Код В (&СписокСчетов)
|
||||||
|
И ОстаткиОбороты.Субконто2 ЕСТЬ НЕ NULL
|
||||||
|
"""
|
||||||
|
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
|
||||||
|
response = requests.post(
|
||||||
|
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
|
||||||
|
|
||||||
|
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
|
||||||
|
json={"query":query, "params": params},
|
||||||
|
auth=auth,
|
||||||
|
verify=False
|
||||||
|
)
|
||||||
|
|
||||||
|
data_from_1c = response.json()
|
||||||
|
df = pd.DataFrame(data_from_1c['data'])
|
||||||
|
engine = get_db_engine()
|
||||||
|
table_name = 'dict_dogovor'
|
||||||
|
field_mapping = {
|
||||||
|
'Наименование': 'name',
|
||||||
|
'Номер': 'nomer',
|
||||||
|
'Дата': 'date_bedin',
|
||||||
|
'СрокДействия': 'date_end',
|
||||||
|
'ИдентификаторДоговора': 'uid_dogovor'
|
||||||
|
}
|
||||||
|
df = df.rename(columns=field_mapping)
|
||||||
|
with engine.begin() as conn:
|
||||||
|
if not df.empty:
|
||||||
|
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
|
||||||
|
df.to_sql(
|
||||||
|
f'temp_{table_name}',
|
||||||
|
con=conn,
|
||||||
|
if_exists='append',
|
||||||
|
index=False,
|
||||||
|
method='multi'
|
||||||
|
)
|
||||||
|
conn.execute(f"DELETE FROM public.{table_name}")
|
||||||
|
conn.execute(f"""
|
||||||
|
INSERT INTO public.{table_name}
|
||||||
|
SELECT
|
||||||
|
name,
|
||||||
|
nomer,
|
||||||
|
date_begin,
|
||||||
|
date_end,
|
||||||
|
uid_dogovor
|
||||||
|
FROM temp_{table_name}
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
def take_bank(**kwargs):
|
||||||
|
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
|
||||||
|
query = """ВЫБРАТЬ РАЗЛИЧНЫЕ
|
||||||
|
ОстаткиОбороты.Субконто1.Наименование КАК Наименование,
|
||||||
|
ОстаткиОбороты.Субконто1.ИНН КАК ИНН,
|
||||||
|
ОстаткиОбороты.Субконто1.КПП КАК КПП,
|
||||||
|
UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента
|
||||||
|
ИЗ
|
||||||
|
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты
|
||||||
|
ГДЕ
|
||||||
|
ОстаткиОбороты.Счет.Код В (&СписокСчетов)
|
||||||
|
И ОстаткиОбороты.Субконто1 ЕСТЬ НЕ NULL
|
||||||
|
"""
|
||||||
|
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
|
||||||
|
response = requests.post(
|
||||||
|
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
|
||||||
|
|
||||||
|
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
|
||||||
|
json={"query":query, "params": params},
|
||||||
|
auth=auth,
|
||||||
|
verify=False
|
||||||
|
)
|
||||||
|
|
||||||
|
data_from_1c = response.json()
|
||||||
|
df = pd.DataFrame(data_from_1c['data'])
|
||||||
|
engine = get_db_engine()
|
||||||
|
table_name = 'dict_bank'
|
||||||
|
field_mapping = {
|
||||||
|
'Наименование': 'name',
|
||||||
|
'ИНН': 'inn',
|
||||||
|
'КПП': 'kpp',
|
||||||
|
'ИдентификаторКонтрагента': 'uid_bank'
|
||||||
|
}
|
||||||
|
df = df.rename(columns=field_mapping)
|
||||||
|
with engine.begin() as conn:
|
||||||
|
if not df.empty:
|
||||||
|
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
|
||||||
|
df.to_sql(
|
||||||
|
f'temp_{table_name}',
|
||||||
|
con=conn,
|
||||||
|
if_exists='append',
|
||||||
|
index=False,
|
||||||
|
method='multi'
|
||||||
|
)
|
||||||
|
conn.execute(f"DELETE FROM public.{table_name}")
|
||||||
|
conn.execute(f"""
|
||||||
|
INSERT INTO public.{table_name}
|
||||||
|
SELECT
|
||||||
|
name,
|
||||||
|
inn,
|
||||||
|
kpp,
|
||||||
|
uid_bank
|
||||||
|
FROM temp_{table_name}
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
def take_organizaciya(**kwargs):
|
||||||
|
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
|
||||||
|
query = """ВЫБРАТЬ РАЗЛИЧНЫЕ
|
||||||
|
ОстаткиОбороты.Организация.Наименование КАК Наименование,
|
||||||
|
ОстаткиОбороты.Организация.ИНН КАК ИНН,
|
||||||
|
ОстаткиОбороты.Организация.КПП КАК КПП,
|
||||||
|
UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторОрганизации
|
||||||
|
ИЗ
|
||||||
|
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты
|
||||||
|
ГДЕ
|
||||||
|
ОстаткиОбороты.Счет.Код В (&СписокСчетов)
|
||||||
|
"""
|
||||||
|
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
|
||||||
|
response = requests.post(
|
||||||
|
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
|
||||||
|
|
||||||
|
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
|
||||||
|
json={"query":query, "params": params},
|
||||||
|
auth=auth,
|
||||||
|
verify=False
|
||||||
|
)
|
||||||
|
|
||||||
|
data_from_1c = response.json()
|
||||||
|
df = pd.DataFrame(data_from_1c['data'])
|
||||||
|
engine = get_db_engine()
|
||||||
|
table_name = 'dict_organizaciya'
|
||||||
|
field_mapping = {
|
||||||
|
'Наименование': 'name',
|
||||||
|
'ИНН': 'inn',
|
||||||
|
'КПП': 'kpp',
|
||||||
|
'ИдентификаторДоговора': 'uid_organizaciya'
|
||||||
|
}
|
||||||
|
df = df.rename(columns=field_mapping)
|
||||||
|
with engine.begin() as conn:
|
||||||
|
if not df.empty:
|
||||||
|
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
|
||||||
|
df.to_sql(
|
||||||
|
f'temp_{table_name}',
|
||||||
|
con=conn,
|
||||||
|
if_exists='append',
|
||||||
|
index=False,
|
||||||
|
method='multi'
|
||||||
|
)
|
||||||
|
conn.execute(f"DELETE FROM public.{table_name}")
|
||||||
|
conn.execute(f"""
|
||||||
|
INSERT INTO public.{table_name}
|
||||||
|
SELECT
|
||||||
|
name,
|
||||||
|
inn,
|
||||||
|
kpp,
|
||||||
|
uid_organizaciya
|
||||||
|
FROM temp_{table_name}
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
with DAG(
|
||||||
|
dag_id='data_download_from_1C_source',
|
||||||
|
default_args=default_args,
|
||||||
|
description='Выгрузка данных из 1С',
|
||||||
|
schedule_interval=None, #"0,30 01-10 * * *",
|
||||||
|
catchup=False,
|
||||||
|
tags=['sigma'],
|
||||||
|
) as dag:
|
||||||
|
|
||||||
|
take_dogovor_task = PythonOperator(
|
||||||
|
task_id="take_dogovor",
|
||||||
|
python_callable=take_dogovor,
|
||||||
|
provide_context=True
|
||||||
|
)
|
||||||
|
take_bank_task = PythonOperator(
|
||||||
|
task_id="take_bank",
|
||||||
|
python_callable=take_bank,
|
||||||
|
provide_context=True
|
||||||
|
)
|
||||||
|
take_organizaciya_task = PythonOperator(
|
||||||
|
task_id="take_organizaciya",
|
||||||
|
python_callable=take_organizaciya,
|
||||||
|
provide_context=True
|
||||||
|
)
|
||||||
|
|
||||||
|
[read_data_1C_task, take_bank_task, take_organizaciya_task]
|
||||||
Loading…
Reference in New Issue