인프런 커뮤니티 질문&답변
forloop으로 task 정의시 task_id 정해지는 로직
작성
·
20
·
수정됨
0
안녕하세요. 49강 수강 중 질문이 있습니다.
for table_name in TABLES.keys():
extract_from_postgres(postgres_schema, table_name) >> load_to_snowflake(snowflake_schema, table_name)현재 강사님이 주신 이 코드 기준 테이블 2개 tasks 2개 해서 총 4개의 tasks가 airflow tasks list의 결과로 반환되었는데요, 이때의 결과물인 tasks_id가 어떻게 만들어지는지 궁금합니다.
조금 더 정확히는 forloop으로 task를 정의할 때 어떤 식으로 DAG가 이 task의 개수를 세고 네이밍을 하는지 궁금합니다.
혼자 테스트를 해보고 싶어서 임의의 테이블 하나를 postgres:production에 추가하고 코드내부의 TABLES 딕셔너리에 제가 추가한 테이블의 스키마를 추가하였습니다. 이때 테이블이 총 3개가 되었으므로 airflow tasks list의 결과가 총 6개가 될 것으로 예상하였는데 여전히 4개로 나옵니다. 제가 놓친 부분이 있을까요?
현재 production schema아래 3개의 테이블이 있는 상태입니다.
airflow=# SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'production'
AND table_type = 'BASE TABLE'
ORDER BY table_name;
table_name
------------------------
session_timestamp
user_session_channel
user_session_channel_2
(3 rows)감사합니다.
학습 관련 질문을 상세하게 남겨주시면 더 좋습니다. 예를 들어 이해가 안 가는 부분이 있다고 하면 강의에서 어느 부분인지 어떤 부분이 이해가 안되는지 등등 추가 정보가 큰 도움이 됩니다. 그리고 에러가 난다면 어떤 에러 메시지가 나오는지 같이 공유해주세요.
혹시라도 유사한 질문이 있었는지 먼저 확인 부탁 드리겠습니다.
서로 예의를 지키며 존중하는 문화를 만들어갔으면 하고 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.
답변 3
1
앗 제가 바보같은 실수를 했습니다.. dags_to_move 아래의 파일을 수정하고 있었네요. dags아래의 파일을 수정하니 tasks의 개수가 자동으로 늘어나고 task_id는 <function_name>_<idx>로 자동생성됩니다. 감사합니다!
0
0
안녕하세요, 인프런 AI 인턴입니다. avocado님.
Airflow에서 for loop를 사용하여 여러 테이블에 대해 동일한 태스크를 생성하고 연결하는 과정에 대해 질문 주셨습니다. 주어진 코드에서 TABLES 딕셔너리는 테이블 이름을 키로 가지며, 이 반복문은 각 테이블에 대해 extract_from_postgres와 load_to_snowflake라는 태스크를 생성합니다. Airflow에서의 태스크 ID(task_id)는 일반적으로 оператор 인스턴스 생성 시 자동으로 부여되며, 이들의 패턴은 주로 ‘연산자_태스크_이름’ 형태로 설정됩니다.
테이블을 추가했는데도 태스크가 예상대로 생성되지 않는 경우에는 다음과 같은 부분을 확인해 보세요:
DAG의 설정과 실행: 테이블을 추가한 후 DAG를 다시 활성화해서 실행했는지 확인해 보세요. DAG는 변경사항이 있을 때 이를 감지하고 태스크를 새로 생성해야 합니다.
기존 태스크 영향 여부: 기존의 태스크나 DAG 설정이 추가된 테이블에 대한 작업을 수행하는지 잘 확인합니다. 예를 들어, 태스크가 서로 의존성이 있거나 스케줄러에 의해 관리되고 있을 수 있습니다.
Airflow 로그 확인: Airflow UI에서 로그를 확인하여 특정 문제나 테이블 추가가 반영되지 않은 원인을 찾을 수 있습니다.
테이블을 여러 개 다룰 때, 각 태스크의 ID는 extract_from_postgres_<table_name>과 같은 형태로 설정될 수 있으며, 각각의 task_id는 고유해야 하는 요구 조건이 있습니다. 혹시 ID 충돌이나 다른 이름 설정 문제로 인해 태스크가 추가되지 않는 경우도 있을 수 있습니다.
비슷한 주제를 다룬 기존의 질문이나 스레드를 참조하시면 더 깊이 있는 정보 습득에 도움이 될 것입니다. 예를 들어:
- Airflow 태스크 및 DAG 설정에 대한 실습 질문
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.





