159 lines
8.3 KiB
Python
159 lines
8.3 KiB
Python
import requests
|
||
import json
|
||
import pandas as pd
|
||
import numpy as np
|
||
from datetime import datetime
|
||
from requests.auth import HTTPBasicAuth
|
||
from sqlalchemy import create_engine
|
||
from airflow import DAG
|
||
from airflow.operators.python import PythonOperator
|
||
|
||
default_args = {
|
||
'owner': 'airflow',
|
||
'depends_on_past': False,
|
||
'start_date': datetime(2023, 1, 1),
|
||
'retries': 1,
|
||
}
|
||
|
||
def get_db_engine():
|
||
"""Создает подключение к PostgreSQL"""
|
||
DF_CONFIG = {
|
||
'dbname': "postgres",
|
||
'user': "postgres",
|
||
'password': "4a00d4b90cd830da0796",
|
||
'host': "postgresql",
|
||
'port': "5432"
|
||
}
|
||
return create_engine(
|
||
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
|
||
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
|
||
pool_size=10,
|
||
max_overflow=20
|
||
)
|
||
|
||
def read_data_1C(**kwargs):
|
||
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
|
||
query = """ВЫБРАТЬ
|
||
ОстаткиИОбороты.Счет,
|
||
ОстаткиИОбороты.Субконто1,
|
||
ОстаткиИОбороты.Субконто2,
|
||
ОстаткиИОбороты.Субконто3,
|
||
ОстаткиИОбороты.Организация,
|
||
ОстаткиИОбороты.СуммаОборот,
|
||
ОстаткиИОбороты.СуммаОборотДт,
|
||
ОстаткиИОбороты.СуммаОборотКт,
|
||
ОстаткиИОбороты.СуммаКонечныйОстаток,
|
||
ОстаткиИОбороты.СуммаКонечныйОстатокДт,
|
||
ОстаткиИОбороты.СуммаКонечныйОстатокКт,
|
||
ОстаткиИОбороты.СуммаКонечныйРазвернутыйОстатокДт,
|
||
ОстаткиИОбороты.СуммаКонечныйРазвернутыйОстатокКт
|
||
ИЗ
|
||
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиИОбороты
|
||
ГДЕ
|
||
ОстаткиИОбороты.Счет.Код В (&СписокСчетов)
|
||
"""
|
||
|
||
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
|
||
response = requests.post(
|
||
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
|
||
#
|
||
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
|
||
json={"query":query, "params": params},
|
||
auth=auth,
|
||
verify=False
|
||
)
|
||
|
||
data_from_1c = response.json()
|
||
df = pd.DataFrame(data_from_1c['data'])
|
||
engine = get_db_engine()
|
||
table_name = 'oborotno_salbdovaya_vedomostb'
|
||
|
||
field_mapping = {
|
||
# Основные поля
|
||
'Счет': 'schet',
|
||
'Субконто1': 'subkonto1',
|
||
'Субконто2': 'subkonto2',
|
||
'Организация': 'organizaciya',
|
||
# 'Валюта': 'valyuta',
|
||
|
||
# Суммовые остатки и обороты
|
||
# 'СуммаНачальныйОстаток': 'summa_nachalnyy_ostatok',
|
||
# 'СуммаНачальныйОстатокДт': 'summa_nachalnyy_ostatok_dt',
|
||
# 'СуммаНачальныйОстатокКт': 'summa_nachalnyy_ostatok_kt',
|
||
# 'СуммаНачальныйРазвернутыйОстатокДт': 'summa_nachalnyy_razvernutyy_ostatok_dt',
|
||
# 'СуммаНачальныйРазвернутыйОстатокКт': 'summa_nachalnyy_razvernutyy_ostatok_kt',
|
||
'СуммаОборот': 'summa_oborot',
|
||
'СуммаОборотДт': 'summa_oborot_dt',
|
||
'СуммаОборотКт': 'summa_oborot_kt',
|
||
'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok',
|
||
'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt',
|
||
'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt',
|
||
'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt',
|
||
'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt'
|
||
|
||
# Валютные остатки и обороты
|
||
# 'ВалютнаяСуммаНачальныйОстаток': 'valyutnaya_summa_nachalnyy_ostatok',
|
||
# 'ВалютнаяСуммаНачальныйОстатокДт': 'valyutnaya_summa_nachalnyy_ostatok_dt',
|
||
# 'ВалютнаяСуммаНачальныйОстатокКт': 'valyutnaya_summa_nachalnyy_ostatok_kt',
|
||
# 'ВалютнаяСуммаНачальныйРазвернутыйОстатокДт': 'valyutnaya_summa_nachalnyy_razvernutyy_ostatok_dt',
|
||
# 'ВалютнаяСуммаНачальныйРазвернутыйОстатокКт': 'valyutnaya_summa_nachalnyy_razvernutyy_ostatok_kt',
|
||
# 'ВалютнаяСуммаОборот': 'valyutnaya_summa_oborot',
|
||
# 'ВалютнаяСуммаОборотДт': 'valyutnaya_summa_oborot_dt',
|
||
# 'ВалютнаяСуммаОборотКт': 'valyutnaya_summa_oborot_kt',
|
||
# 'ВалютнаяСуммаКонечныйОстаток': 'valyutnaya_summa_konechnyy_ostatok',
|
||
# 'ВалютнаяСуммаКонечныйОстатокДт': 'valyutnaya_summa_konechnyy_ostatok_dt',
|
||
# 'ВалютнаяСуммаКонечныйОстатокКт': 'valyutnaya_summa_konechnyy_ostatok_kt',
|
||
# 'ВалютнаяСуммаКонечныйРазвернутыйОстатокДт': 'valyutnaya_summa_konechnyy_razvernutyy_ostatok_dt',
|
||
# 'ВалютнаяСуммаКонечныйРазвернутыйОстатокКт': 'valyutnaya_summa_konechnyy_razvernutyy_ostatok_kt',
|
||
|
||
# Количественные остатки и обороты
|
||
# 'КоличествоНачальныйОстаток': 'kolichestvo_nachalnyy_ostatok',
|
||
# 'КоличествоНачальныйОстатокДт': 'kolichestvo_nachalnyy_ostatok_dt',
|
||
# 'КоличествоНачальныйОстатокКт': 'kolichestvo_nachalnyy_ostatok_kt',
|
||
# 'КоличествоНачальныйРазвернутыйОстатокДт': 'kolichestvo_nachalnyy_razvernutyy_ostatok_dt',
|
||
# 'КоличествоНачальныйРазвернутыйОстатокКт': 'kolichestvo_nachalnyy_razvernutyy_ostatok_kt',
|
||
# 'КоличествоОборот': 'kolichestvo_oborot',
|
||
# 'КоличествоОборотДт': 'kolichestvo_oborot_dt',
|
||
# 'КоличествоОборотКт': 'kolichestvo_oborot_kt',
|
||
# 'КоличествоКонечныйОстаток': 'kolichestvo_konechnyy_ostatok',
|
||
# 'КоличествоКонечныйОстатокДт': 'kolichestvo_konechnyy_ostatok_dt',
|
||
# 'КоличествоКонечныйОстатокКт': 'kolichestvo_konechnyy_ostatok_kt',
|
||
# 'КоличествоКонечныйРазвернутыйОстатокДт': 'kolichestvo_konechnyy_razvernutyy_ostatok_dt',
|
||
# 'КоличествоКонечныйРазвернутыйОстатокКт': 'kolichestvo_konechnyy_razvernutyy_ostatok_kt'
|
||
}
|
||
|
||
df = df.rename(columns=field_mapping)
|
||
with engine.begin() as conn:
|
||
if not df.empty:
|
||
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
|
||
df.to_sql(
|
||
f'temp_{table_name}',
|
||
con=conn,
|
||
if_exists='append',
|
||
index=False,
|
||
method='multi'
|
||
)
|
||
conn.execute(f"DELETE FROM public.{table_name}")
|
||
conn.execute(f"""
|
||
INSERT INTO public.{table_name}
|
||
SELECT * FROM temp_{table_name}
|
||
--ON CONFLICT (schet, subkonto1, subkonto2, organizaciya)
|
||
"""
|
||
)
|
||
|
||
with DAG(
|
||
dag_id='data_download_from_1C_source',
|
||
default_args=default_args,
|
||
description='Выгрузка данных из 1С',
|
||
schedule_interval=None, #"0,30 01-10 * * *",
|
||
catchup=False,
|
||
tags=['sigma'],
|
||
) as dag:
|
||
|
||
read_data_1C_task = PythonOperator(
|
||
task_id="read_data_1C",
|
||
python_callable=read_data_1C,
|
||
provide_context=True
|
||
)
|
||
|
||
read_data_1C_task |