dags_external_task_sensor 오류 질문
362
작성한 질문수 87
선생님 안녕하세요
좋은 강의 감사합니다.
section 9 dags_external_task_sensor 에서, task b 가 fail로 뜨지 않고 계속 running 인 오류가 나는 데 이유를 모르겠습니다.
dags_branch_python_operator는 아래와 같습니다.
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
with DAG(
dag_id='dags_branch_python_operator',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule='0 1 * * *',
catchup=False
) as dag:
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
# 만약 실행해야 하는 task가 하나라면 task_id를 str 으로 하나만 넣는다.
# 만약 실행해야 하는 task가 두개 이상이라면 list of str을 넣는다.
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']:
return ['task_b','task_c']
python_branch_task = BranchPythonOperator(
task_id='python_branch_task',
python_callable=select_random
)
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
python_branch_task >> [task_a, task_b, task_c]마지막으로 돌린 기록은 a를 선택하고, b,c 는 skipped 된 상태입니다.
dags_external_task_sensor 는 아래와 같고요
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
import pendulum
from datetime import timedelta
from airflow.utils.state import State
with DAG(
dag_id='dags_external_task_sensor',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule='0 7 * * *',
catchup=False
) as dag:
external_task_sensor_a = ExternalTaskSensor(
task_id='external_task_sensor_a',
external_dag_id = 'dags_branch_python_operator',
external_task_id='task_a',
allowed_states=[State.SKIPPED], # task_a 가 skipped로 되면 sensor_a task는 success로 표시된다는 뜻
# allowed states 조건을 만족하지 못하면 계속 실행된다. 10초마다
execution_delta=timedelta(hours=6),
poke_interval=10 # 10초
)
external_task_sensor_b = ExternalTaskSensor(
task_id='external_task_sensor_b',
external_dag_id = 'dags_branch_python_operator',
external_task_id='task_b',
failed_states=[State.SKIPPED], # task_b 가 skipped로 되면 sensor_b task는 failed로 표시된다는 뜻
execution_delta=timedelta(hours=6),
poke_interval=10
)
external_task_sensor_c = ExternalTaskSensor(
task_id='external_task_sensor_c',
external_dag_id = 'dags_branch_python_operator',
external_task_id='task_c',
allowed_states=[State.SUCCESS], # task_c 가 success로 되면 sensor_c task는 success로 표시된다는 뜻
# success가 뜰때까지 꼐속 시도를 한다.
execution_delta=timedelta(hours=6),
poke_interval=10
)
이대로라면 강의에서 나온것 처럼 , b만 fail로 뜨고 a,c는 계속 running 이어야 하는데요, 셋다 running 이 나옵니다. log를 보면 계속 b를 poke만 하고 있더라고요
혹시 무엇이 문제일까요..?ㅠ
답변 1
0
안녕하세요 Nathan님
Task 3개가 모두 running으로 뜨고 있는것은 모니터링할 대상 DAG의 시간대를 못찾았기 때문입니다.
못 찾고있는 이유는 dags_external_task_sensor DAG을 Manual 로 실행시켰기 때문이에요.
01/07 13:39 분에 manual로 dags_external_task_sensor DAG을 실행시켰고 RUN_ID 는 manual__2024-01-07T04:39:13.768757+00:00 로 나옵니다. (manual__2024-01-07T13:39:13.768757+09:00)
모니터링할 대상 (dags_branch_python_operator)의 task 를 찾을 때는 위 RUN_ID 에 있는 시간보다 6시간 앞선 것을 찾으려 합니다.
그러므로 dags_branch_python_operator DAG 에서 data_interval_start가 01/06 07:39:13 (KST)에 수행된 것을 찾으려고 합니다.
그런데 dags_branch_python_operator DAG에는 그 시간에 수행된 Job이 없을테니 dags_external_task_sensor의 3개 task 모두 무한 대기 하는 것입니다.
external task sensor는 manual 로 수행했을 때 저런 문제가 있어요. 일단 실습을 제대로 해보려면 dags_external_task_sensor의 수행 이력 중 RUN_ID가 scheduled__2024-01-06T22:00:00+00:00인 RUN이 있을 거에요. 해당 Run을 선택하시고 clear 버튼을 눌러 재수행해보시겠어요? 그럼 sensing 대상 DAG인 dags_branch_python_operator DAG의 01/07 01:00 (KST)에 수행된 task를 sensing하게 될겁니다.
(물론 저 시간대에 수행된 이력이 있어야 합니다)
pykrx 회원제 전환으로 인한 실습 불가
0
109
2
수료증 발급
0
81
3
에러 발생 관련 질문드립니다.
0
80
2
vscode 작업화면에 오류가 발생하지 않습니다.
0
69
2
plugins 폴더 생성
0
74
2
WSL에서 git push 가 안되요 ㅠ
0
112
2
chatGPT&Airflow로 블로그 자동 포스팅하기 는 Deprecated 가 필요합니다.
0
66
2
github에 회원가입이 안되는데 원인이 뭔지 모르겠어요 ㅠ
0
721
2
설치중인데 venv를 꼭 써야할까요?
0
124
2
설치 버전 관련 질문입니다.
0
68
1
우분투 버전 다운받아야하는데 어떤걸로 설치해야할까요?
0
62
1
DAG 만들기 중 airflow 패키지 로드 에러
0
106
2
3.0에서도 수업노트가 성립하는지 확인 부탁드립니다!
0
95
3
task_id 사용법이 뭔가 바뀐 것 같습니다.
0
76
3
email operator 오류 관련
0
66
2
plugins 폴더 관련
0
95
2
bash operator 관련 문의입니다
0
78
3
스케쥴러 - DAG 파싱 부하 줄이는 과정 질문
0
79
2
Dags refresh 주기 관련 질문
0
123
2
wsl 관련 질문입니다.!
0
73
2
macOS에서 docker 설치
0
93
2
템플릿 변수에 대한 오류
0
61
2
custom_image 디렉토리 문의드립니다.
0
54
2
ETL 인터뷰 관련 문의
0
102
2





