해결된 질문
작성
·
42
0
import pendulum
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
with DAG(
dag_id="dags_bash_with_xcom",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
bash_push=BashOperator(
task_id='bash_push',
bash_command="echo START && "
"echo XCOM_PUSHED "
"{{ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
"echo COMPLETE"
)
bash_pull=BashOperator(
task_id='bash_pull',
env={'PUSHED_VALUE':"{{ti.xcom_pull(key='bash_pushed')}}",
'RETURN_VALUE':"{{ti.xcom_pull(task_ids='bash_push')}}"},
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE",
do_xcom_push=False
)
bash_push >> bash_pull
이게 제가 돌린 코드입니다.
근데 PUSHED_VALUE값이 나오지 않습니다.
아래는 관련 로그입니다.
[2025-07-04, 10:31:39] INFO
- DAG bundles loaded: dags-folder, example_dags: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-07-04, 10:31:39] INFO
- Filling up the DagBag from /opt/airflow/dags/dags_bash_with_xcom.py: source="airflow.models.dagbag.DagBag"
[2025-07-04, 10:31:39] WARNING
- No XCom value found; defaulting to None.: key="bash_pushed": dag_id="dags_bash_with_xcom": task_id="bash_pull": run_id="manual__2025-07-04T01:31:36.786102+00:00": map_index=-1: source="task"
[2025-07-04, 10:31:39] INFO
- Tmp dir root location: /tmp: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-07-04, 10:31:39] INFO
- Running command: ['/usr/bin/bash', '-c', 'echo $PUSHED_VALUE && echo $RETURN_VALUE']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-07-04, 10:31:39] INFO
- Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-07-04, 10:31:39] INFO
- None: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-07-04, 10:31:39] INFO
- COMPLETE: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-07-04, 10:31:39] INFO
- Command exited with return code 0: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-07-04, 10:31:39] INFO
- Task instance is in running state: chan="stdout": source="task"
[2025-07-04, 10:31:39] INFO
- Previous state of the Task instance: TaskInstanceState.QUEUED: chan="stdout": source="task"
[2025-07-04, 10:31:39] INFO
- Current task name:bash_pull: chan="stdout": source="task"
[2025-07-04, 10:31:39] INFO
- Dag name:dags_bash_with_xcom: chan="stdout": source="task"
[2025-07-04, 10:31:39] INFO
- Task instance in success state: chan="stdout": source="task"
[2025-07-04, 10:31:39] INFO
- Previous state of the Task instance: TaskInstanceState.RUNNING: chan="stdout": source="task"
[2025-07-04, 10:31:39] INFO
- Task operator:<Task(BashOperator): bash_pull>: chan="stdout": source="task"
답변 3
0
이유진님
제가 좀 확인해본 결과
2.10.5 버전까지는 PUSHED_VALUE 값이 잘 나옵니다. (아래는 제가 2.10.5에서 확인해본 결과입니다)
그런데 3.0.0 되면서부터 task_ids 값을 주지 않으면 xcom 값을 찾지 못하는 현상이 발생하는 듯 합니다.
혹시 3.0.0 부터 task_ids 값을 필수로 주도록 바꼈나? 해서 코드를 확인해봤지만 아직까지 task_ids 값이 필수인 것으로 바뀐 코드는 없습니다. 여전히 task_ids 값은 선택적이고 None 값을 줘도 되는 것으로 보입니다.
아래는 TaskInstance 클래스의 xcom_pull 함수에서 Xcom 값을 꺼내올 때 사용하는 XComModel.get_many 함수의 내용입니다.
3.0 버전에서도 task_ids 파라미터는 Optional 하고 None 값을 주면 Xcom 반환 대상에서 필터만 제거된다고 설명이 되어있지만
task_ids=None 을 주면 Xcom을 찾지 못하는 버그로 보입니다.
일단 실습은 task_ids 값과 key 값을 같이 주면 Xcom을 찾아오고 있으니 당분간은 task_ids 값을 넣어야 할 듯 합니다.
향후 Airflow 버전이 올라가면서 task_ids 값을 필수로 넣도록 바꾸거나 아니면 현재 나타나는 버그가 수정될 듯 합니다.
task_ids 없이 key만 줬을 때 xcom 을 찾아오지 못하는 현상은 BashOperator 에서 뿐만 아니라 PythonOperator 등 다른 오퍼레이터에서도 공통적으로 나타나고 있습니다. 관련된 영상에는 수업노트에 본 내용을 참고사항으로 추가하도록 하겠습니다.
질문주셔서 감사드립니다 ^-^
0
안녕하세요 이유진님
PUSHED_VALUE의 경우 None으로 나오는 걸 확인했고, 아마도 버전이 바뀌면서 xcom_pull 할 때 task_ids 값을 주지 않으면 안되는 것으로 보입니다.
제가 좀 더 확인해본 후 코드 수정 & 안내 설정하고 다시 답변 드릴께요.
감사합니다.
0
안녕하세요, 인프런 AI 인턴입니다.
이유진님, BashOperator 사용 시 XCom을 직접적으로 가져오거나 푸쉬하는 것이 아닌, 주로 다른 방법을 사용하여 값을 전달해야 합니다. BashOperator
에서 XCom을 통해 값을 전달하려면, bash 명령어가 출력을 통해 값을 전달하고 그 값을 XCom에 저장되도록 해야 합니다.
지금 코드를 보면, bash_push
에서 XCom
으로 값을 푸쉬하려고 하지만, 실제로는 bash_command
에 포함된 {{ ti.xcom_push(...) }}
표현식이 bash 명령어 실행 전에 Jinja 템플릿에 의해 평가됩니다. 그 결과, BashOperator에서는 이 표현식이 올바르게 실행되지 않습니다.
각 BashOperator는 명령어 실행 후 표준 출력에 쓰인 값을 자동으로 XCom에 푸시하기 때문에, 이를 활용하여 값을 전달하거나 가져와야 합니다. 예를 들어, bash_push
에서 echo "first_bash_message" >> /tmp/bash_message.txt
로 파일에 기록하고, bash_pull
에서 읽어서 사용하거나, 두 번째 방법으로는 bash_push
에서 출력을 통해 바로 값을 전달하고 bash_pull
에서 XCom으로 받아오는 방법이 있습니다.
해결 방법으로 bash_push
태스크의 bash_command
에 echo "first_bash_message"
를 넣고, bash_pull
태스크에서는 env
환경변수를 설정하지 않고 bash_command
에서 $(< /tmp/bash_message.txt)
와 같이 cat
명령어를 활용해 값을 읽어오는 방법으로 수정해 볼 수 있습니다.
또한 관련된 글을 몇 가지 추천드립니다. 아래 링크들을 확인해 보시면 도움이 될 것입니다.
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다. 현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏 추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.
네 답변 감사합니다!