sigma/dags/OSV.py

147 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
def read_data_1C(**kwargs):
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
query = """ВЫБРАТЬ *
ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиИОбороты
ГДЕ
ОстаткиИОбороты.Счет.Код В (&СписокСчетов)
"""
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
response = requests.post(
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
#
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
json={"query":query, "params": params},
auth=auth,
verify=False
)
data_from_1c = response.json()
df = pd.DataFrame(data_from_1c['data'])
engine = get_db_engine()
table_name = 'oborotno_salbdovaya_vedomostb'
field_mapping = {
# Основные поля
'Счет': 'schet',
'Субконто1': 'subkonto1',
'Субконто2': 'subkonto2',
'Субконто3': 'subkonto3',
'Организация': 'organizaciya',
'Валюта': 'valyuta',
# Суммовые остатки и обороты
'СуммаНачальныйОстаток': 'summa_nachalnyy_ostatok',
'СуммаНачальныйОстатокДт': 'summa_nachalnyy_ostatok_dt',
'СуммаНачальныйОстатокКт': 'summa_nachalnyy_ostatok_kt',
'СуммаНачальныйРазвернутыйОстатокДт': 'summa_nachalnyy_razvernutyy_ostatok_dt',
'СуммаНачальныйРазвернутыйОстатокКт': 'summa_nachalnyy_razvernutyy_ostatok_kt',
'СуммаОборот': 'summa_oborot',
'СуммаОборotДт': 'summa_oborot_dt',
'СуммаОборotКт': 'summa_oborot_kt',
'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok',
'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt',
'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt',
'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt',
'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt',
# Валютные остатки и обороты
'ВалютнаяСуммаНачальныйОстаток': 'valyutnaya_summa_nachalnyy_ostatok',
'ВалютнаяСуммаНачальныйОстатокДт': 'valyutnaya_summa_nachalnyy_ostatok_dt',
'ВалютнаяСуммаНачальныйОстатокКт': 'valyutnaya_summa_nachalnyy_ostatok_kt',
'ВалютнаяСуммаНачальныйРазвернутыйОстатокДт': 'valyutnaya_summa_nachalnyy_razvernutyy_ostatok_dt',
'ВалютнаяСумmaНачальныйРазвернутыйОстатокКт': 'valyutnaya_summa_nachalnyy_razvernutyy_ostatok_kt',
'ВалютнаяСумmaОборот': 'valyutnaya_summa_oborot',
'ВалютнаяСумmaОборotДт': 'valyutnaya_summa_oborot_dt',
'ВалютнаяСумmaОборotКт': 'valyutnaya_summa_oborot_kt',
'ВалютнаяСумmaКонечныйОстаток': 'valyutnaya_summa_konechnyy_ostatok',
'ВалютнаяСумmaКонечныйОстатокДт': 'valyutnaya_summa_konechnyy_ostatok_dt',
'ВалютнаяСумmaКонечныйОстатокКт': 'valyutnaya_summa_konechnyy_ostatok_kt',
'ВалютнаяСумmaКонечныйРазвернутыйОстатокДт': 'valyutnaya_summa_konechnyy_razvernutyy_ostatok_dt',
'ВалютнаяСумmaКонечныйРазвернутыйОстатокКт': 'valyutnaya_summa_konechnyy_razvernutyy_ostatok_kt',
# Количественные остатки и обороты
'КоличествоНачальныйОстаток': 'kolichestvo_nachalnyy_ostatok',
'КоличествоНачальныйОстатокДт': 'kolichestvo_nachalnyy_ostatok_dt',
'КоличествоНачальныйОстатокКт': 'kolichestvo_nachalnyy_ostatok_kt',
'КоличествоНачальныйРазвернутыйОстатокДт': 'kolichestvo_nachalnyy_razvernutyy_ostatok_dt',
'КоличествоНачальныйРазвернутыйОстатокКт': 'kolichestvo_nachalnyy_razvernutyy_ostatok_kt',
'КоличествоОборот': 'kolichestvo_oborot',
'КоличествоОборотДт': 'kolichestvo_oborot_dt',
'КоличествоОборотКт': 'kolichestvo_oborot_kt',
'КоличествоКонечныйОстаток': 'kolichestvo_konechnyy_ostatok',
'КоличествоКонечныйОстатокДт': 'kolichestvo_konechnyy_ostatok_dt',
'КоличествоКонечныйОстатокКт': 'kolichestvo_konechnyy_ostatok_kt',
'КоличествоКонечныйРазвернутыйОстатокДт': 'kolichestvo_konechnyy_razvernutyy_ostatok_dt',
'КоличествоКонечныйРазвернутыйОстатокКт': 'kolichestvo_konechnyy_razvernutyy_ostatok_kt'
}
df = df.rename(columns=field_mapping)
with engine.begin() as conn:
if not df.empty:
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
df.to_sql(
f'temp_{table_name}',
con=conn,
if_exists='append',
index=False,
method='multi'
)
conn.execute(f"DELETE FROM public.{table_name} WHERE ")
conn.execute(f"""
INSERT INTO public.{table_name}
SELECT * FROM temp_{table_name}
--ON CONFLICT (schet, subkonto1, subkonto2, organizaciya)
"""
)
with DAG(
dag_id='data_download_from_1C_source',
default_args=default_args,
description='Выгрузка данных из 1С',
schedule_interval=None, #"0,30 01-10 * * *",
catchup=False,
tags=['sigma'],
) as dag:
read_data_1C_task = PythonOperator(
task_id="read_data_1C",
python_callable=read_data_1C,
provide_context=True
)
read_data_1C_task