PostgresOperator로 대량의 데이터 업로드 방법 질문드립니다.
안녕하세요 선생님 🙂
PostgresOperator 질문 드립니다.
DB table에 데이터를 갱신하는 task를 혼자 만들어보고 있는데요.
PostgresOperator는 executemany와 같은 기능을 지원하지 않는 것으로 확인했습니다.
airflow에서 대량의 데이터를 insert / update 하는 방법이 있을까요..?
回答 1
1
안녕하세요 JP님,
일단 executemany같은 경우 PythonOperator를 사용하시고 psycopg2를 이용하시면 될 듯 힙니다. 대충 코드를 만들어 보자면 ...
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import psycopg2
def batch_insert_to_postgres():
conn = psycopg2.connect("dbname=your_db user=your_user password=your_password")
cur = conn.cursor()
query = "INSERT INTO your_table (column1, column2) VALUES (%s, %s)"
data = [(1, 'abc'), (2, 'def'), (3, 'ghi')]
cur.executemany(query, data)
conn.commit()
cur.close()
conn.close()
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 12, 12),
# other default args
}
dag = DAG('postgres_batch_insert', default_args=default_args, schedule_interval='@daily')
insert_task = PythonOperator(
task_id='insert_to_postgres',
python_callable=batch_insert_to_postgres,
dag=dag,
)
insert_task
하지만 데이터가 엄청나게 많은 데이터 같은 경우(Billion level), 저 같은 경우는 대부분 Spark를 사용해서 로딩을 합니다. 개인용 프로젝트로는 위의 코드처럼 하시면 충분 하실 듯 합니다.
6-6
0
12
1
작업형 1 유형 부분
0
13
1
수강평 이벤트
0
17
2
apache airflow 설치하기 질문
0
85
2
postgres_loader DAG 에러
1
52
3
Queue 강의를 듣고 난 후에 대한 질의
0
51
1
공식 compose 내 postgres db 설치시
0
75
2
postgres_loader의 apache-airflow-providers-postgres 버전 호환성
1
184
2
Airflow Limitation 강의에 대한 질문
1
92
1
airflow와 postgres간의 connection 오류
1
1146
4
from airflow.sensors.sql import SqlSensor에 대해 질문 있습니다.
1
254
1
메타데이터 의미
1
367
2
병렬처리 질문드립니다.
1
508
2
connection 정보 이전 방법 질문드립니다.
1
278
1
강의 할인 프로모션 질문입니다..
1
282
1
hook 질문드립니다.
1
347
1
section 2-hook 강의 질문
1
289
1
airflow tasks test 질문드립니다!
1
509
3
airflow docker compose 질문드립니다.
1
419
1
섹션1 apache airflow 설치하기 질문
1
462
1
my_first_dag.py 파일 질문 입니다
1
399
1
Docker 에서 airflow 사용시 질문드립니다
1
956
2
airflow tasks test error
2
577
1
블로그에 글을 올려도 되나요?
0
486
2


