inflearn logo
강의

Khóa học

Chia sẻ kiến thức

Apache Airflow cùng với các kỹ sư Thung lũng Silicon

PostgresOperator로 대량의 데이터 업로드 방법 질문드립니다.

Đã giải quyết

395

JP

10 câu hỏi đã được viết

1

안녕하세요 선생님 🙂

PostgresOperator 질문 드립니다.

DB table에 데이터를 갱신하는 task를 혼자 만들어보고 있는데요.

PostgresOperator는 executemany와 같은 기능을 지원하지 않는 것으로 확인했습니다.

airflow에서 대량의 데이터를 insert / update 하는 방법이 있을까요..?

python 빅데이터 데이터-엔지니어링 airflow

Câu trả lời 1

1

altoformula

안녕하세요 JP님,

일단 executemany같은 경우 PythonOperator를 사용하시고 psycopg2를 이용하시면 될 듯 힙니다. 대충 코드를 만들어 보자면 ...

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import psycopg2

def batch_insert_to_postgres():
    conn = psycopg2.connect("dbname=your_db user=your_user password=your_password")
    cur = conn.cursor()

    query = "INSERT INTO your_table (column1, column2) VALUES (%s, %s)"
    data = [(1, 'abc'), (2, 'def'), (3, 'ghi')]

    cur.executemany(query, data)

    conn.commit()
    cur.close()
    conn.close()

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 12, 12),
    # other default args
}

dag = DAG('postgres_batch_insert', default_args=default_args, schedule_interval='@daily')

insert_task = PythonOperator(
    task_id='insert_to_postgres',
    python_callable=batch_insert_to_postgres,
    dag=dag,
)

insert_task

하지만 데이터가 엄청나게 많은 데이터 같은 경우(Billion level), 저 같은 경우는 대부분 Spark를 사용해서 로딩을 합니다. 개인용 프로젝트로는 위의 코드처럼 하시면 충분 하실 듯 합니다.

1

JP

답변감사합니다!! ㅠㅠ

추후 큰 데이터로 spark 로딩도 해봐야겠네요!!

빠른 답변 감사합니다!!

1

JP

조금 우여곡절이 있었지만 조언해주신 덕분에 DAG 구동 성공했습니다!
감사합니다 ㅠㅠ

image

모델 서빙과 관련된 강좌 출시 예정된 바가 있으신지 여쭤봅니다!

0

1

0

모델 서빙과 관련된 강좌가 출시되는지 질문드립니다.

0

1

0

20번강좌에 대한 질문입니다.

0

5

1

apache airflow 설치하기 질문

0

85

2

postgres_loader DAG 에러

1

52

3

Queue 강의를 듣고 난 후에 대한 질의

0

51

1

공식 compose 내 postgres db 설치시

0

75

2

postgres_loader의 apache-airflow-providers-postgres 버전 호환성

1

184

2

Airflow Limitation 강의에 대한 질문

1

92

1

airflow와 postgres간의 connection 오류

1

1146

4

from airflow.sensors.sql import SqlSensor에 대해 질문 있습니다.

1

254

1

메타데이터 의미

1

367

2

병렬처리 질문드립니다.

1

508

2

connection 정보 이전 방법 질문드립니다.

1

278

1

강의 할인 프로모션 질문입니다..

1

282

1

hook 질문드립니다.

1

347

1

section 2-hook 강의 질문

1

289

1

airflow tasks test 질문드립니다!

1

509

3

airflow docker compose 질문드립니다.

1

419

1

섹션1 apache airflow 설치하기 질문

1

462

1

my_first_dag.py 파일 질문 입니다

1

399

1

Docker 에서 airflow 사용시 질문드립니다

1

956

2

airflow tasks test error

2

577

1

블로그에 글을 올려도 되나요?

0

486

2