강의

멘토링

커뮤니티

인프런 커뮤니티 질문&답변

dellahong님의 프로필 이미지
dellahong

작성한 질문수

토스 개발자와 함께하는 Data Workflow Management 기반의 대용량 데이터 처리 설계 패턴

Dynamic DAG 생성 패턴

강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요

작성

·

8

·

수정됨

0

 dynamicDag

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from db_utils import execute_query

import logging

def log_task_start(message):
    logging.info(f"[START] {message}")

def log_task_end(message):
    logging.info(f"[END] {message}")

# 분석할 주식 종목 리스트 (설정으로 관리)
STOCK_SYMBOLS = ['AAPL','GOOGL','MSFT','AMZN','TSLA']

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def analyze_stock_data(symbol, **context):
    """주식 종목에 대해서, 최근 30일간의 주식 데이터를 분석"""
    log_task_start(f"주식 분석 - {symbol}")

    query = """
    SELECT
        symbol,
        AVG(price) as avg_price,
        MAX(price) as max_price,
        MIN(price) as min_price,
        SUM(volume) as total_volume,
        COUNT(*) as trading_days
    FROM stock_prices
    WHERE symbol = %s
    AND trade_date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY)
    GROUP BY symbol
    """

    result = execute_query(query, [symbol])

    if result:
        data = result[0]
        print(f"{symbol} 분석 결과:")
        print(f"  평균 가격: ${data[1]:.2f}")
        print(f"  최고 가격: ${data[2]:.2f}")
        print(f"  최저 가격: ${data[3]:.2f}")
        print(f"  총 거래량: {data[4]:,}")
        print(f"  거래일수: {data[5]}일")

    log_task_end(f"주식 분석 - {symbol}")
    return result

def create_stock_dag(symbol):
    """개별 주식에 대한 DAG 생성 함수"""

    dag_id = f'stock_analysis_{symbol.lower()}'

    dag = DAG(
        dag_id,
        default_args=default_args,
        description=f'{symbol} 주식 데이터 분석 DAG',
        schedule='@daily',
        catchup=False,
        tags=['dynamic', 'stock', 'analysis', symbol]
    )

    # 데이터 품질 체크 태스크
    def check_data_quality(**context):
        query = """
        SELECT COUNT(*) as count
        FROM stock_prices
        WHERE symbol = %s
        AND trade_date = CURDATE() - INTERVAL 1 DAY
        """
        result = execute_query(query, [symbol])
        logging.info(f"{symbol} 데이터 품질 체크: {result[0][0] if result else 0}")
        return result

    data_quality_check = PythonOperator(
        task_id=f'check_data_quality_{symbol.lower()}',
        python_callable=check_data_quality,
        dag=dag
    )

    # 주식 분석 태스크
    analyze_task = PythonOperator(
        task_id=f'analyze_{symbol.lower()}',
        python_callable=analyze_stock_data,
        op_kwargs={'symbol': symbol},
        dag=dag
    )

    # 결과 저장 태스크
    def save_results(**context):
        query = """
        INSERT INTO daily_stats (symbol, summary_date, avg_price, total_volume, trade_count)
        SELECT 
            %s,
            CURDATE() - INTERVAL 1 DAY,
            AVG(price),
            SUM(volume),
            COUNT(*)
        FROM stock_prices
        WHERE symbol = %s
        AND trade_date = CURDATE() - INTERVAL 1 DAY
        ON DUPLICATE KEY UPDATE
            avg_price = VALUES(avg_price),
            total_volume = VALUES(total_volume),
            trade_count = VALUES(trade_count)
        """        
        result = execute_query(query, [symbol, symbol])
        logging.info(f"{symbol} 분석 결과 저장 완료")
        return result

    save_results = PythonOperator(
        task_id=f'save_analysis_{symbol.lower()}',
        python_callable=save_results,
        dag=dag
    )

    # 태스크 의존성 설정
    data_quality_check >> analyze_task >> save_results

    return dag

# Dynamic DAG 생성
# 각 주식 종목에 대해 별도의 DAG 생성
for symbol in STOCK_SYMBOLS:
    dag_id = f'stock_analysis_{symbol.lower()}'
    globals()[dag_id] = create_stock_dag(symbol)

CrossDag (트리거 센서)

from datetime import datetime, timedelta
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from db_utils import execute_query
import logging

default_args = {
    'owner': 'data-team',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

def process_data(**context):
    """간단한 데이터 처리"""
    logging.info("데이터 처리 시작")
    query = "SELECT COUNT(*) FROM stock_prices"

    result = execute_query(query)
    count = result[0][0] if result else 0

    logging.info(f"처리된 레코드 수 : ${count}")
    return {"record_count": count}


trigger_dag = DAG(
    'trigger_example_dag',
    default_args=default_args,
    description='TriggerDagRunOperator 예제',
    schedule='@daily',
    catchup=False,
    tags=['cross-dag', 'trigger']
)

process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=trigger_dag
)

trigger_next_dag = TriggerDagRunOperator(
    task_id='trigger_sensor_dag',
    trigger_dag_id='sensor_example_dag',
    wait_for_completion=False,   # 완료를 기다리지 않음
    dag=trigger_dag
)

process_task >> trigger_next_dag

# ========================================================

# 두 번째 DAG: 외부 태스크 대기
def analyze_triggered_data(**context):
    """트리거된 후 분석 작업"""
    logging.info("외부 태스크 완료 후 분석 시작")

    analysis_result = {
        "status": "completed",
        "timestamp": str(datetime.now())
    }

    logging.info(f"분석 완료: {analysis_result}")
    return analysis_result


sensor_dag = DAG(
    'sensor_example_dag',
    default_args=default_args,
    description='ExternalTaskSensor 예제',
    schedule=None,     # 트리거로만 실행됨
    catchup=False,
    tags=['cross-dag', 'sensor']
)

# 외부 DAG의 태스크 완료를 기다림
def get_most_recent_dag_run(dt):
    """가장 최근의 DAG 실행을 찾는 함수"""
    from airflow.models import DagRun
    from airflow.utils.session import provide_session

    @provide_session
    def get_recent_run(session=None):
        recent_run = session.query(DagRun).filter(
            DagRun.dag_id == 'trigger_example_dag',
            DagRun.state == 'success'
        ).order_by(DagRun.execution_date.desc()).first()

        if recent_run:
            return recent_run.execution_date
        return dt

    return get_recent_run()


wait_for_external_task = ExternalTaskSensor(
    task_id='wait_for_process_data',
    external_dag_id='trigger_example_dag',   # 기다릴 DAG
    external_task_id='process_data',         # 기다릴 태스크
    execution_date_fn=get_most_recent_dag_run,
    timeout=60,             # 1분 타임아웃
    poke_interval=5,        # 5초마다 확인
    allowed_states=['success'],
    dag=sensor_dag
)

analyze_task = PythonOperator(
    task_id='analyze_triggered_data',
    python_callable=analyze_triggered_data,
    dag=sensor_dag
)

wait_for_external_task >> analyze_task

답변

답변을 기다리고 있는 질문이에요
첫번째 답변을 남겨보세요!
dellahong님의 프로필 이미지
dellahong

작성한 질문수

질문하기