inflearn logo
강의

講義

知識共有

Apache Airflow を使用したシリコンバレーのエンジニア

PostgresOperator로 대량의 데이터 업로드 방법 질문드립니다.

解決済みの質問

409

JP

投稿した質問数 10

1

안녕하세요 선생님 🙂

PostgresOperator 질문 드립니다.

DB table에 데이터를 갱신하는 task를 혼자 만들어보고 있는데요.

PostgresOperator는 executemany와 같은 기능을 지원하지 않는 것으로 확인했습니다.

airflow에서 대량의 데이터를 insert / update 하는 방법이 있을까요..?

python 빅데이터 데이터-엔지니어링 airflow

回答 1

1

altoformula

안녕하세요 JP님,

일단 executemany같은 경우 PythonOperator를 사용하시고 psycopg2를 이용하시면 될 듯 힙니다. 대충 코드를 만들어 보자면 ...

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import psycopg2

def batch_insert_to_postgres():
    conn = psycopg2.connect("dbname=your_db user=your_user password=your_password")
    cur = conn.cursor()

    query = "INSERT INTO your_table (column1, column2) VALUES (%s, %s)"
    data = [(1, 'abc'), (2, 'def'), (3, 'ghi')]

    cur.executemany(query, data)

    conn.commit()
    cur.close()
    conn.close()

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 12, 12),
    # other default args
}

dag = DAG('postgres_batch_insert', default_args=default_args, schedule_interval='@daily')

insert_task = PythonOperator(
    task_id='insert_to_postgres',
    python_callable=batch_insert_to_postgres,
    dag=dag,
)

insert_task

하지만 데이터가 엄청나게 많은 데이터 같은 경우(Billion level), 저 같은 경우는 대부분 Spark를 사용해서 로딩을 합니다. 개인용 프로젝트로는 위의 코드처럼 하시면 충분 하실 듯 합니다.

1

JP

답변감사합니다!! ㅠㅠ

추후 큰 데이터로 spark 로딩도 해봐야겠네요!!

빠른 답변 감사합니다!!

1

JP

조금 우여곡절이 있었지만 조언해주신 덕분에 DAG 구동 성공했습니다!
감사합니다 ㅠㅠ

image

실행이 안 되는데요.

0

2

0

airflow 3로 되면서 2.x대에 지원 중단된 패키지가 많네요..ㅠ

0

44

1

dags 디렉토리안에 sql디렉토리 넣고 .sql 파일로 관리해도 되나요?

0

44

2

apache airflow 설치하기 질문

0

100

2

postgres_loader DAG 에러

1

66

3

Queue 강의를 듣고 난 후에 대한 질의

0

67

1

공식 compose 내 postgres db 설치시

0

85

2

postgres_loader의 apache-airflow-providers-postgres 버전 호환성

1

199

2

Airflow Limitation 강의에 대한 질문

1

98

1

airflow와 postgres간의 connection 오류

1

1159

4

from airflow.sensors.sql import SqlSensor에 대해 질문 있습니다.

1

260

1

메타데이터 의미

1

377

2

병렬처리 질문드립니다.

1

519

2

connection 정보 이전 방법 질문드립니다.

1

284

1

강의 할인 프로모션 질문입니다..

1

289

1

hook 질문드립니다.

1

358

1

section 2-hook 강의 질문

1

300

1

airflow tasks test 질문드립니다!

1

519

3

airflow docker compose 질문드립니다.

1

426

1

섹션1 apache airflow 설치하기 질문

1

473

1

my_first_dag.py 파일 질문 입니다

1

407

1

Docker 에서 airflow 사용시 질문드립니다

1

974

2

airflow tasks test error

2

579

1

블로그에 글을 올려도 되나요?

0

495

2