강의

멘토링

커뮤니티

인프런 커뮤니티 질문&답변

avocado님의 프로필 이미지
avocado

작성한 질문수

실리콘밸리 데이터 리더가 알려주는 Airflow 기초

Postgres 테이블 읽어오기 - 실습

forloop으로 task 정의시 task_id 정해지는 로직

작성

·

20

·

수정됨

0

안녕하세요. 49강 수강 중 질문이 있습니다.

    for table_name in TABLES.keys():
        extract_from_postgres(postgres_schema, table_name) >> load_to_snowflake(snowflake_schema, table_name)

현재 강사님이 주신 이 코드 기준 테이블 2개 tasks 2개 해서 총 4개의 tasks가 airflow tasks list의 결과로 반환되었는데요, 이때의 결과물인 tasks_id가 어떻게 만들어지는지 궁금합니다.

조금 더 정확히는 forloop으로 task를 정의할 때 어떤 식으로 DAG가 이 task의 개수를 세고 네이밍을 하는지 궁금합니다.

혼자 테스트를 해보고 싶어서 임의의 테이블 하나를 postgres:production에 추가하고 코드내부의 TABLES 딕셔너리에 제가 추가한 테이블의 스키마를 추가하였습니다. 이때 테이블이 총 3개가 되었으므로 airflow tasks list의 결과가 총 6개가 될 것으로 예상하였는데 여전히 4개로 나옵니다. 제가 놓친 부분이 있을까요?

현재 production schema아래 3개의 테이블이 있는 상태입니다.

airflow=# SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'production'
  AND table_type = 'BASE TABLE'
ORDER BY table_name;
       table_name       
------------------------
 session_timestamp
 user_session_channel
 user_session_channel_2
(3 rows)

감사합니다.

 

  • 학습 관련 질문을 상세하게 남겨주시면 더 좋습니다. 예를 들어 이해가 안 가는 부분이 있다고 하면 강의에서 어느 부분인지 어떤 부분이 이해가 안되는지 등등 추가 정보가 큰 도움이 됩니다. 그리고 에러가 난다면 어떤 에러 메시지가 나오는지 같이 공유해주세요.

     

  • 혹시라도 유사한 질문이 있었는지 먼저 확인 부탁 드리겠습니다.

     

  • 서로 예의를 지키며 존중하는 문화를 만들어갔으면 하고 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.

답변 3

1

avocado님의 프로필 이미지
avocado
질문자

앗 제가 바보같은 실수를 했습니다.. dags_to_move 아래의 파일을 수정하고 있었네요. dags아래의 파일을 수정하니 tasks의 개수가 자동으로 늘어나고 task_id는 <function_name>_<idx>로 자동생성됩니다. 감사합니다!

0

한기용님의 프로필 이미지
한기용
지식공유자

제가 답하기 전에 해결하셨군요. 다행입니다. 또 질문 생기면 편하게 주세요!

0

안녕하세요, 인프런 AI 인턴입니다. avocado님.

Airflow에서 for loop를 사용하여 여러 테이블에 대해 동일한 태스크를 생성하고 연결하는 과정에 대해 질문 주셨습니다. 주어진 코드에서 TABLES 딕셔너리는 테이블 이름을 키로 가지며, 이 반복문은 각 테이블에 대해 extract_from_postgresload_to_snowflake라는 태스크를 생성합니다. Airflow에서의 태스크 ID(task_id)는 일반적으로 оператор 인스턴스 생성 시 자동으로 부여되며, 이들의 패턴은 주로 ‘연산자_태스크_이름’ 형태로 설정됩니다.

테이블을 추가했는데도 태스크가 예상대로 생성되지 않는 경우에는 다음과 같은 부분을 확인해 보세요:

  1. DAG의 설정과 실행: 테이블을 추가한 후 DAG를 다시 활성화해서 실행했는지 확인해 보세요. DAG는 변경사항이 있을 때 이를 감지하고 태스크를 새로 생성해야 합니다.

  2. 기존 태스크 영향 여부: 기존의 태스크나 DAG 설정이 추가된 테이블에 대한 작업을 수행하는지 잘 확인합니다. 예를 들어, 태스크가 서로 의존성이 있거나 스케줄러에 의해 관리되고 있을 수 있습니다.

  3. Airflow 로그 확인: Airflow UI에서 로그를 확인하여 특정 문제나 테이블 추가가 반영되지 않은 원인을 찾을 수 있습니다.

테이블을 여러 개 다룰 때, 각 태스크의 ID는 extract_from_postgres_<table_name>과 같은 형태로 설정될 수 있으며, 각각의 task_id는 고유해야 하는 요구 조건이 있습니다. 혹시 ID 충돌이나 다른 이름 설정 문제로 인해 태스크가 추가되지 않는 경우도 있을 수 있습니다.

비슷한 주제를 다룬 기존의 질문이나 스레드를 참조하시면 더 깊이 있는 정보 습득에 도움이 될 것입니다. 예를 들어:
- Airflow 태스크 및 DAG 설정에 대한 실습 질문

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.

avocado님의 프로필 이미지
avocado

작성한 질문수

질문하기