inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

실리콘밸리 엔지니어와 함께하는 Apache Airflow

PostgresOperator로 대량의 데이터 업로드 방법 질문드립니다.

해결된 질문

396

JP

작성한 질문수 10

1

안녕하세요 선생님 🙂

PostgresOperator 질문 드립니다.

DB table에 데이터를 갱신하는 task를 혼자 만들어보고 있는데요.

PostgresOperator는 executemany와 같은 기능을 지원하지 않는 것으로 확인했습니다.

airflow에서 대량의 데이터를 insert / update 하는 방법이 있을까요..?

python 빅데이터 데이터-엔지니어링 airflow

답변 1

1

미쿡엔지니어

안녕하세요 JP님,

일단 executemany같은 경우 PythonOperator를 사용하시고 psycopg2를 이용하시면 될 듯 힙니다. 대충 코드를 만들어 보자면 ...

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import psycopg2

def batch_insert_to_postgres():
    conn = psycopg2.connect("dbname=your_db user=your_user password=your_password")
    cur = conn.cursor()

    query = "INSERT INTO your_table (column1, column2) VALUES (%s, %s)"
    data = [(1, 'abc'), (2, 'def'), (3, 'ghi')]

    cur.executemany(query, data)

    conn.commit()
    cur.close()
    conn.close()

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 12, 12),
    # other default args
}

dag = DAG('postgres_batch_insert', default_args=default_args, schedule_interval='@daily')

insert_task = PythonOperator(
    task_id='insert_to_postgres',
    python_callable=batch_insert_to_postgres,
    dag=dag,
)

insert_task

하지만 데이터가 엄청나게 많은 데이터 같은 경우(Billion level), 저 같은 경우는 대부분 Spark를 사용해서 로딩을 합니다. 개인용 프로젝트로는 위의 코드처럼 하시면 충분 하실 듯 합니다.

1

JP

답변감사합니다!! ㅠㅠ

추후 큰 데이터로 spark 로딩도 해봐야겠네요!!

빠른 답변 감사합니다!!

1

JP

조금 우여곡절이 있었지만 조언해주신 덕분에 DAG 구동 성공했습니다!
감사합니다 ㅠㅠ

image

작업형2 모의문제1 (30강)

0

10

1

섹션 5 CSS selector사용해서 클로링하기2의 커리큘럼 일정 부재?

0

11

2

가상환경 초반 에러_create name

0

17

1

apache airflow 설치하기 질문

0

86

2

postgres_loader DAG 에러

1

54

3

Queue 강의를 듣고 난 후에 대한 질의

0

51

1

공식 compose 내 postgres db 설치시

0

76

2

postgres_loader의 apache-airflow-providers-postgres 버전 호환성

1

184

2

Airflow Limitation 강의에 대한 질문

1

93

1

airflow와 postgres간의 connection 오류

1

1147

4

from airflow.sensors.sql import SqlSensor에 대해 질문 있습니다.

1

254

1

메타데이터 의미

1

368

2

병렬처리 질문드립니다.

1

508

2

connection 정보 이전 방법 질문드립니다.

1

278

1

강의 할인 프로모션 질문입니다..

1

283

1

hook 질문드립니다.

1

348

1

section 2-hook 강의 질문

1

290

1

airflow tasks test 질문드립니다!

1

509

3

airflow docker compose 질문드립니다.

1

419

1

섹션1 apache airflow 설치하기 질문

1

463

1

my_first_dag.py 파일 질문 입니다

1

399

1

Docker 에서 airflow 사용시 질문드립니다

1

958

2

airflow tasks test error

2

577

1

블로그에 글을 올려도 되나요?

0

486

2