inflearn logo
강의

Course

Instructor

Apache Airflow with Silicon Valley Engineers

PostgresOperator로 대량의 데이터 업로드 방법 질문드립니다.

Resolved

395

JP

10 asked

1

안녕하세요 선생님 🙂

PostgresOperator 질문 드립니다.

DB table에 데이터를 갱신하는 task를 혼자 만들어보고 있는데요.

PostgresOperator는 executemany와 같은 기능을 지원하지 않는 것으로 확인했습니다.

airflow에서 대량의 데이터를 insert / update 하는 방법이 있을까요..?

python 빅데이터 데이터-엔지니어링 airflow

Answer 1

1

altoformula

안녕하세요 JP님,

일단 executemany같은 경우 PythonOperator를 사용하시고 psycopg2를 이용하시면 될 듯 힙니다. 대충 코드를 만들어 보자면 ...

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import psycopg2

def batch_insert_to_postgres():
    conn = psycopg2.connect("dbname=your_db user=your_user password=your_password")
    cur = conn.cursor()

    query = "INSERT INTO your_table (column1, column2) VALUES (%s, %s)"
    data = [(1, 'abc'), (2, 'def'), (3, 'ghi')]

    cur.executemany(query, data)

    conn.commit()
    cur.close()
    conn.close()

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 12, 12),
    # other default args
}

dag = DAG('postgres_batch_insert', default_args=default_args, schedule_interval='@daily')

insert_task = PythonOperator(
    task_id='insert_to_postgres',
    python_callable=batch_insert_to_postgres,
    dag=dag,
)

insert_task

하지만 데이터가 엄청나게 많은 데이터 같은 경우(Billion level), 저 같은 경우는 대부분 Spark를 사용해서 로딩을 합니다. 개인용 프로젝트로는 위의 코드처럼 하시면 충분 하실 듯 합니다.

1

JP

답변감사합니다!! ㅠㅠ

추후 큰 데이터로 spark 로딩도 해봐야겠네요!!

빠른 답변 감사합니다!!

1

JP

조금 우여곡절이 있었지만 조언해주신 덕분에 DAG 구동 성공했습니다!
감사합니다 ㅠㅠ

image

6-6

0

8

1

작업형 1 유형 부분

0

11

1

수강평 이벤트

0

17

2

apache airflow 설치하기 질문

0

85

2

postgres_loader DAG 에러

1

52

3

Queue 강의를 듣고 난 후에 대한 질의

0

51

1

공식 compose 내 postgres db 설치시

0

75

2

postgres_loader의 apache-airflow-providers-postgres 버전 호환성

1

184

2

Airflow Limitation 강의에 대한 질문

1

92

1

airflow와 postgres간의 connection 오류

1

1146

4

from airflow.sensors.sql import SqlSensor에 대해 질문 있습니다.

1

254

1

메타데이터 의미

1

367

2

병렬처리 질문드립니다.

1

508

2

connection 정보 이전 방법 질문드립니다.

1

278

1

강의 할인 프로모션 질문입니다..

1

282

1

hook 질문드립니다.

1

347

1

section 2-hook 강의 질문

1

289

1

airflow tasks test 질문드립니다!

1

509

3

airflow docker compose 질문드립니다.

1

419

1

섹션1 apache airflow 설치하기 질문

1

462

1

my_first_dag.py 파일 질문 입니다

1

399

1

Docker 에서 airflow 사용시 질문드립니다

1

956

2

airflow tasks test error

2

577

1

블로그에 글을 올려도 되나요?

0

486

2