Inflearn brand logo image

인프런 커뮤니티 질문&답변

이유진님의 프로필 이미지
이유진

작성한 질문수

Airflow 마스터 클래스

BranchPython 오퍼레이터로 분기처리하기

bashoperator는 t1.xcom_push가 안되나요?

해결된 질문

작성

·

42

0

import pendulum
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG

with DAG(
    dag_id="dags_bash_with_xcom",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    bash_push=BashOperator(
        task_id='bash_push',
        bash_command="echo START && "
                    "echo XCOM_PUSHED "
                    "{{ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
                    "echo COMPLETE"
    )
    
    bash_pull=BashOperator(
        task_id='bash_pull',
        env={'PUSHED_VALUE':"{{ti.xcom_pull(key='bash_pushed')}}",
             'RETURN_VALUE':"{{ti.xcom_pull(task_ids='bash_push')}}"},
        bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE",
        do_xcom_push=False
    )
    
    bash_push >> bash_pull

이게 제가 돌린 코드입니다.

 

근데 PUSHED_VALUE값이 나오지 않습니다.

아래는 관련 로그입니다.

 

[2025-07-04, 10:31:39] INFO - DAG bundles loaded: dags-folder, example_dags: source="airflow.dag_processing.bundles.manager.DagBundlesManager"

[2025-07-04, 10:31:39] INFO - Filling up the DagBag from /opt/airflow/dags/dags_bash_with_xcom.py: source="airflow.models.dagbag.DagBag"

[2025-07-04, 10:31:39] WARNING - No XCom value found; defaulting to None.: key="bash_pushed": dag_id="dags_bash_with_xcom": task_id="bash_pull": run_id="manual__2025-07-04T01:31:36.786102+00:00": map_index=-1: source="task"

[2025-07-04, 10:31:39] INFO - Tmp dir root location: /tmp: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"

[2025-07-04, 10:31:39] INFO - Running command: ['/usr/bin/bash', '-c', 'echo $PUSHED_VALUE && echo $RETURN_VALUE']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"

[2025-07-04, 10:31:39] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"

[2025-07-04, 10:31:39] INFO - None: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"

[2025-07-04, 10:31:39] INFO - COMPLETE: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"

[2025-07-04, 10:31:39] INFO - Command exited with return code 0: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"

[2025-07-04, 10:31:39] INFO - Task instance is in running state: chan="stdout": source="task"

[2025-07-04, 10:31:39] INFO - Previous state of the Task instance: TaskInstanceState.QUEUED: chan="stdout": source="task"

[2025-07-04, 10:31:39] INFO - Current task name:bash_pull: chan="stdout": source="task"

[2025-07-04, 10:31:39] INFO - Dag name:dags_bash_with_xcom: chan="stdout": source="task"

[2025-07-04, 10:31:39] INFO - Task instance in success state: chan="stdout": source="task"

[2025-07-04, 10:31:39] INFO - Previous state of the Task instance: TaskInstanceState.RUNNING: chan="stdout": source="task"

[2025-07-04, 10:31:39] INFO - Task operator:<Task(BashOperator): bash_pull>: chan="stdout": source="task"

답변 3

0

김현진님의 프로필 이미지
김현진
지식공유자

이유진님

제가 좀 확인해본 결과

2.10.5 버전까지는 PUSHED_VALUE 값이 잘 나옵니다. (아래는 제가 2.10.5에서 확인해본 결과입니다)

image.png

 

그런데 3.0.0 되면서부터 task_ids 값을 주지 않으면 xcom 값을 찾지 못하는 현상이 발생하는 듯 합니다.

혹시 3.0.0 부터 task_ids 값을 필수로 주도록 바꼈나? 해서 코드를 확인해봤지만 아직까지 task_ids 값이 필수인 것으로 바뀐 코드는 없습니다. 여전히 task_ids 값은 선택적이고 None 값을 줘도 되는 것으로 보입니다.

아래는 TaskInstance 클래스의 xcom_pull 함수에서 Xcom 값을 꺼내올 때 사용하는 XComModel.get_many 함수의 내용입니다.

 

image.png

 

3.0 버전에서도 task_ids 파라미터는 Optional 하고 None 값을 주면 Xcom 반환 대상에서 필터만 제거된다고 설명이 되어있지만

task_ids=None 을 주면 Xcom을 찾지 못하는 버그로 보입니다.

일단 실습은 task_ids 값과 key 값을 같이 주면 Xcom을 찾아오고 있으니 당분간은 task_ids 값을 넣어야 할 듯 합니다.

 

향후 Airflow 버전이 올라가면서 task_ids 값을 필수로 넣도록 바꾸거나 아니면 현재 나타나는 버그가 수정될 듯 합니다.

 

task_ids 없이 key만 줬을 때 xcom 을 찾아오지 못하는 현상은 BashOperator 에서 뿐만 아니라 PythonOperator 등 다른 오퍼레이터에서도 공통적으로 나타나고 있습니다. 관련된 영상에는 수업노트에 본 내용을 참고사항으로 추가하도록 하겠습니다.

 

질문주셔서 감사드립니다 ^-^

이유진님의 프로필 이미지
이유진
질문자

네 답변 감사합니다!

 

0

김현진님의 프로필 이미지
김현진
지식공유자

안녕하세요 이유진님

PUSHED_VALUE의 경우 None으로 나오는 걸 확인했고, 아마도 버전이 바뀌면서 xcom_pull 할 때 task_ids 값을 주지 않으면 안되는 것으로 보입니다.

제가 좀 더 확인해본 후 코드 수정 & 안내 설정하고 다시 답변 드릴께요.

감사합니다.

0

안녕하세요, 인프런 AI 인턴입니다.

이유진님, BashOperator 사용 시 XCom을 직접적으로 가져오거나 푸쉬하는 것이 아닌, 주로 다른 방법을 사용하여 값을 전달해야 합니다. BashOperator에서 XCom을 통해 값을 전달하려면, bash 명령어가 출력을 통해 값을 전달하고 그 값을 XCom에 저장되도록 해야 합니다.

지금 코드를 보면, bash_push에서 XCom으로 값을 푸쉬하려고 하지만, 실제로는 bash_command에 포함된 {{ ti.xcom_push(...) }} 표현식이 bash 명령어 실행 전에 Jinja 템플릿에 의해 평가됩니다. 그 결과, BashOperator에서는 이 표현식이 올바르게 실행되지 않습니다.

각 BashOperator는 명령어 실행 후 표준 출력에 쓰인 값을 자동으로 XCom에 푸시하기 때문에, 이를 활용하여 값을 전달하거나 가져와야 합니다. 예를 들어, bash_push에서 echo "first_bash_message" >> /tmp/bash_message.txt로 파일에 기록하고, bash_pull에서 읽어서 사용하거나, 두 번째 방법으로는 bash_push에서 출력을 통해 바로 값을 전달하고 bash_pull에서 XCom으로 받아오는 방법이 있습니다.

해결 방법으로 bash_push 태스크의 bash_commandecho "first_bash_message"를 넣고, bash_pull 태스크에서는 env 환경변수를 설정하지 않고 bash_command에서 $(< /tmp/bash_message.txt)와 같이 cat 명령어를 활용해 값을 읽어오는 방법으로 수정해 볼 수 있습니다.

또한 관련된 글을 몇 가지 추천드립니다. 아래 링크들을 확인해 보시면 도움이 될 것입니다.

  1. BashOperator와 XCom 사용 방법

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다. 현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏 추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.

이유진님의 프로필 이미지
이유진

작성한 질문수

질문하기