Обновить dags/OSV.py
This commit is contained in:
parent
1e3c2abb4f
commit
86e8149252
21
dags/OSV.py
21
dags/OSV.py
|
|
@ -33,7 +33,7 @@ def get_db_engine():
|
||||||
|
|
||||||
def read_data_1C(**kwargs):
|
def read_data_1C(**kwargs):
|
||||||
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
|
params = {"СписокСчетов": ["66","66.01","66.02", "66.03","66.04","66.21","66.22","66.23","66.24", "67","67.01","67.02", "67.03","67.04","67.21","67.22","67.23","67.24"]}
|
||||||
query = """ВЫБРАТЬ ПЕРВЫЕ 5 *
|
query = """ВЫБРАТЬ *
|
||||||
ИЗ
|
ИЗ
|
||||||
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиИОбороты
|
РегистрБухгалтерии.Хозрасчетный.ОстаткиИОбороты КАК ОстаткиИОбороты
|
||||||
ГДЕ
|
ГДЕ
|
||||||
|
|
@ -52,6 +52,25 @@ def read_data_1C(**kwargs):
|
||||||
|
|
||||||
data_from_1c = response.json()
|
data_from_1c = response.json()
|
||||||
df = pd.DataFrame(data_from_1c['data'])
|
df = pd.DataFrame(data_from_1c['data'])
|
||||||
|
table_name = 'oborotno_salbdovaya_vedomostb'
|
||||||
|
|
||||||
|
with engine.begin() as conn:
|
||||||
|
if not df.empty:
|
||||||
|
conn.execute(f"CREATE TEMP TABLE temp_{table_name} AS SELECT * FROM public.{table_name} WHERE 1 = 0")
|
||||||
|
df.to_sql(
|
||||||
|
f'temp_{table_name}',
|
||||||
|
con=conn
|
||||||
|
if_exists='append',
|
||||||
|
index=False,
|
||||||
|
method='multi'
|
||||||
|
)
|
||||||
|
conn.execute(f"DELETE FROM public.{table_name} WHERE ")
|
||||||
|
conn.execute(f"""
|
||||||
|
INSERT INTO public.{table_name}
|
||||||
|
SELECT * FROM temp_{table_name}
|
||||||
|
--ON CONFLICT (schet, subkonto1, subkonto2, organizaciya)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
with DAG(
|
with DAG(
|
||||||
dag_id='data_download_from_1C_source',
|
dag_id='data_download_from_1C_source',
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue