작성자 없음
작성자 정보가 삭제된 글입니다.
작성
·
40
0
import pendulum
from airflow.sdk import DAG, task
with DAG(
dag_id="dags_python_with_macro",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='task_using_macros',
templates_dict={'start_date':'{{(data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}',
'end_date': '{{(data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}'
}
)
def get_datetime_macro(**kwargs):
templates_dict=kwargs.get('templates_dict') or {}
if templates_dict:
start_date=templates_dict.get('start_date') or 'start_date 없음'
end_date=templates_dict.get('end_date') or 'end_date 없음'
print(start_date)
print(end_date)
@task(task_id='task_direct_calc')
def get_datetime_calc(**kwargs):
from dateutil.relativedelta import relativedelta
data_interval_end=kwargs['data_interval_end']
prev_month_day_first=data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1,day=1)
prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) + relativedelta(days=-1)
print(prev_month_day_first.strftime('%Y-%m-%d'))
print(prev_month_day_last.strftime('%Y-%m-%d'))
get_datetime_macro() >> get_datetime_calc()
수업 내용에 따라서 했는데 17번줄에서 오류가 나서 딕셔너리 호출을 못한다고 합니다.
chat gpt한테 물어보니 이제는 인자로 호출을 해야한다고해서..맞는지 확인 부탁드립니다.
답변 2
0
0
안녕하세요, 인프런 AI 인턴입니다. 이유진님.
현재 작성하신 DAG 코드에서 문제는 두 가지 부분에서 발생할 수 있습니다. 확인한 결과, Airflow에서 사용하는 task
데코레이터와 관련된 호출 방식 또는 사용 버전의 차이에 따라 문제가 발생할 수 있습니다.
Airflow 버전:
@task
데코레이터의 사용 방식이나 templates_dict
의 지원이 다르게 작동할 수 있습니다. 최신 버전에서는 @task
데이터 전송 방식이 변경되었을 가능성도 있습니다.airflow.sdk
의존성이 정확히 어떤 버전을 참조하고 있는지 확인이 필요할 수도 있습니다.templates_dict 형식문제:
templates_dict
내의 매크로(template) 사용법이 간혹 예상과 다르게 작동할 수 있습니다. 특히, 변수 연산 및 매크로 결합 사용 시 문제가 발생할 수 있습니다.@task 데코레이터 점검:
@task
데코레이터의 지원 항목이 다를 수 있으므로, 공식 문서나 변경 내역을 참고하는 것이 좋습니다. 버전에 따른 API 변동이 있다면 @task
대신 PythonOperator
를 사용하여 시도해 볼 수 있습니다.예제 코드 개선:
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
from dateutil.relativedelta import relativedelta
with DAG(
dag_id="dags_python_with_macro",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
@task(task_id='task_using_macros')
def get_datetime_macro(**kwargs):
data_interval_end = kwargs['data_interval_end']
start_date = (data_interval_end.in_timezone("Asia/Seoul") + relativedelta(months=-1, day=1)).strftime('%Y-%m-%d')
end_date = (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + relativedelta(days=-1)).strftime('%Y-%m-%d')
print(f'Start date: {start_date}')
print(f'End date: {end_date}')
@task(task_id='task_direct_calc')
def get_datetime_calc(**kwargs):
data_interval_end = kwargs['data_interval_end']
prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1)
prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) + relativedelta(days=-1)
print(prev_month_day_first.strftime('%Y-%m-%d'))
print(prev_month_day_last.strftime('%Y-%m-%d'))
get_datetime_macro() >> get_datetime_calc()
이 예제에서는 상속된 DAG과 직접적 템플릿 출력 부분을 Python 내부 도구로 적용하여 오류를 최소화하였습니다.
문제가 지속된다면, 사용하는 정확한 Airflow 버전 정보를 확인하고, 해당 버전의 릴리즈 노트를 통해 변경 사항을 검토하는 것이 좋겠습니다.
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.