sigma/dags/OSV.py

162 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import pandas as pd
import numpy as np
from datetime import datetime
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
def get_db_engine():
"""Создает подключение к PostgreSQL"""
DF_CONFIG = {
'dbname': "postgres",
'user': "postgres",
'password': "4a00d4b90cd830da0796",
'host': "postgresql",
'port': "5432"
}
return create_engine(
f"postgresql+psycopg2://{DF_CONFIG['user']}:{DF_CONFIG['password']}@"
f"{DF_CONFIG['host']}:{DF_CONFIG['port']}/{DF_CONFIG['dbname']}",
pool_size=10,
max_overflow=20
)
def read_data_1C(**kwargs):
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
query = """ВЫБРАТЬ
ОстаткиОбороты.Счет,
ОстаткиОбороты.Субконто1,
ОстаткиОбороты.Субконто2,
ОстаткиОбороты.Организация,
ОстаткиОбороты.Субконто2.Номер КАК НомерДоговора,
ОстаткиОбороты.Субконто2.Дата КАК ДатаДоговора,
ОстаткиОбороты.Субконто2.СрокДействия КАК СрокДействияДоговора,
ОстаткиОбороты.Субконто1.Инн КАК ИннКонтрагента,
ОстаткиОбороты.Организация.Инн КАК ИннКлиента,
ОстаткиОбороты.СуммаОборот,
ОстаткиОбороты.СуммаОборотДт,
ОстаткиОбороты.СуммаОборотКт,
ОстаткиОбороты.СуммаКонечныйОстаток,
ОстаткиОбороты.СуммаКонечныйОстатокДт,
ОстаткиОбороты.СуммаКонечныйОстатокКт,
ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокДт,
ОстаткиОбороты.СуммаКонечныйРазвернутыйОстатокКт,
UUID(ОстаткиОбороты.Субконто2.Ссылка) КАК ИдентификаторДоговора,
UUID(ОстаткиОбороты.Субконто1.Ссылка) КАК ИдентификаторКонтрагента,
UUID(ОстаткиОбороты.Организация.Ссылка) КАК ИдентификаторКлиента
ИЗ
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиОбороты
ГДЕ
ОстаткиОбороты.Счет.Код В (&СписокСчетов)
"""
auth = HTTPBasicAuth('obmen', 'bOR2W7w4')
response = requests.post(
# http://адрес сервера/название БД/hs/services/query?ProcessPostedRequest
url=r'http://192.168.1.75/chupd/hs/services/query?ProcessPostedRequest',
json={"query":query, "params": params},
auth=auth,
verify=False
)
data_from_1c = response.json()
df = pd.DataFrame(data_from_1c['data'])
engine = get_db_engine()
table_name = 'oborotno_salbdovaya_vedomostb'
field_mapping = {
# Основные поля
'Счет': 'schet',
'Субконто1': 'subkonto1',
'Субконто2': 'subkonto2',
'Организация': 'organizaciya',
'НомерДоговора': 'nomer',
'ДатаДоговора': 'date_begin',
'СрокДействияДоговора': 'date_end',
'ИннКонтрагента': 'inn_subkonto1',
'ИннКлиента': 'inn_organizaciya',
# Суммовые остатки и обороты
'СуммаОборот': 'summa_oborot',
'СуммаОборотДт': 'summa_oborot_dt',
'СуммаОборотКт': 'summa_oborot_kt',
'СуммаКонечныйОстаток': 'summa_konechnyy_ostatok',
'СуммаКонечныйОстатокДт': 'summa_konechnyy_ostatok_dt',
'СуммаКонечныйОстатокКт': 'summa_konechnyy_ostatok_kt',
'СуммаКонечныйРазвернутыйОстатокДт': 'summa_konechnyy_razvernutyy_ostatok_dt',
'СуммаКонечныйРазвернутыйОстатокКт': 'summa_konechnyy_razvernutyy_ostatok_kt',
#ID
'ИдентификаторДоговора': 'uid_subkonto2',
'ИдентификаторКонтрагента': 'uid_subkonto1',
'ИдентификаторКлиента': 'uid_organizaciya'
}
df = df.rename(columns=field_mapping)
with engine.begin() as conn:
if not df.empty:
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
df.to_sql(
f'temp_{table_name}',
con=conn,
if_exists='append',
index=False,
method='multi'
)
conn.execute(f"DELETE FROM public.{table_name} where get_date = CURRENT_DATE::date::timestamp")
conn.execute(f"""
INSERT INTO public.{table_name}
SELECT
schet
, subkonto1
, subkonto2
, organizaciya
, case when nomer = '' then null else nomer end nomer
, case when TO_DATE(date_begin, 'DD.MM.YYYY') = '0001-01-01' then null else date_begin end date_begin
, case when TO_DATE(date_end, 'DD.MM.YYYY') = '0001-01-01' then null else date_end end date_end
, inn_subkonto1 as inn_subkonto1
, inn_organizaciya as inn_organizaciya
, summa_oborot
, summa_oborot_dt
, summa_oborot_kt
, summa_konechnyy_ostatok
, summa_konechnyy_ostatok_dt
, summa_konechnyy_ostatok_kt
, summa_konechnyy_razvernutyy_ostatok_dt
, summa_konechnyy_razvernutyy_ostatok_kt
, uid_subkonto2
, uid_subkonto1
, uid_organizaciya
, CURRENT_DATE::date::timestamp
FROM temp_{table_name}
--ON CONFLICT (schet, uid_subkonto2, uid_subkonto1, uid_organizaciya)
"""
)
with DAG(
dag_id='data_download_from_1C_source',
default_args=default_args,
description='Выгрузка данных из 1С',
schedule_interval=None, #"0,30 01-10 * * *",
catchup=False,
tags=['sigma'],
) as dag:
read_data_1C_task = PythonOperator(
task_id="read_data_1C",
python_callable=read_data_1C,
provide_context=True
)
read_data_1C_task