강의

멘토링

커뮤니티

인프런 커뮤니티 질문&답변

dellahong님의 프로필 이미지
dellahong

작성한 질문수

토스 개발자와 함께하는 Data Workflow Management 기반의 대용량 데이터 처리 설계 패턴

Dynamic DAG 생성 패턴

강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요

작성

·

42

·

수정됨

0

 dynamicDag

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from db_utils import execute_query

import logging

def log_task_start(message):
    logging.info(f"[START] {message}")

def log_task_end(message):
    logging.info(f"[END] {message}")

# 분석할 주식 종목 리스트 (설정으로 관리)
STOCK_SYMBOLS = ['AAPL','GOOGL','MSFT','AMZN','TSLA']

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def analyze_stock_data(symbol, **context):
    """주식 종목에 대해서, 최근 30일간의 주식 데이터를 분석"""
    log_task_start(f"주식 분석 - {symbol}")

    query = """
    SELECT
        symbol,
        AVG(price) as avg_price,
        MAX(price) as max_price,
        MIN(price) as min_price,
        SUM(volume) as total_volume,
        COUNT(*) as trading_days
    FROM stock_prices
    WHERE symbol = %s
    AND trade_date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY)
    GROUP BY symbol
    """

    result = execute_query(query, [symbol])

    if result:
        data = result[0]
        print(f"{symbol} 분석 결과:")
        print(f"  평균 가격: ${data[1]:.2f}")
        print(f"  최고 가격: ${data[2]:.2f}")
        print(f"  최저 가격: ${data[3]:.2f}")
        print(f"  총 거래량: {data[4]:,}")
        print(f"  거래일수: {data[5]}일")

    log_task_end(f"주식 분석 - {symbol}")
    return result

def create_stock_dag(symbol):
    """개별 주식에 대한 DAG 생성 함수"""

    dag_id = f'stock_analysis_{symbol.lower()}'

    dag = DAG(
        dag_id,
        default_args=default_args,
        description=f'{symbol} 주식 데이터 분석 DAG',
        schedule='@daily',
        catchup=False,
        tags=['dynamic', 'stock', 'analysis', symbol]
    )

    # 데이터 품질 체크 태스크
    def check_data_quality(**context):
        query = """
        SELECT COUNT(*) as count
        FROM stock_prices
        WHERE symbol = %s
        AND trade_date = CURDATE() - INTERVAL 1 DAY
        """
        result = execute_query(query, [symbol])
        logging.info(f"{symbol} 데이터 품질 체크: {result[0][0] if result else 0}")
        return result

    data_quality_check = PythonOperator(
        task_id=f'check_data_quality_{symbol.lower()}',
        python_callable=check_data_quality,
        dag=dag
    )

    # 주식 분석 태스크
    analyze_task = PythonOperator(
        task_id=f'analyze_{symbol.lower()}',
        python_callable=analyze_stock_data,
        op_kwargs={'symbol': symbol},
        dag=dag
    )

    # 결과 저장 태스크
    def save_results(**context):
        query = """
        INSERT INTO daily_stats (symbol, summary_date, avg_price, total_volume, trade_count)
        SELECT 
            %s,
            CURDATE() - INTERVAL 1 DAY,
            AVG(price),
            SUM(volume),
            COUNT(*)
        FROM stock_prices
        WHERE symbol = %s
        AND trade_date = CURDATE() - INTERVAL 1 DAY
        ON DUPLICATE KEY UPDATE
            avg_price = VALUES(avg_price),
            total_volume = VALUES(total_volume),
            trade_count = VALUES(trade_count)
        """        
        result = execute_query(query, [symbol, symbol])
        logging.info(f"{symbol} 분석 결과 저장 완료")
        return result

    save_results = PythonOperator(
        task_id=f'save_analysis_{symbol.lower()}',
        python_callable=save_results,
        dag=dag
    )

    # 태스크 의존성 설정
    data_quality_check >> analyze_task >> save_results

    return dag

# Dynamic DAG 생성
# 각 주식 종목에 대해 별도의 DAG 생성
for symbol in STOCK_SYMBOLS:
    dag_id = f'stock_analysis_{symbol.lower()}'
    globals()[dag_id] = create_stock_dag(symbol)

CrossDag (트리거 센서)

from datetime import datetime, timedelta
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from db_utils import execute_query
import logging

#기본 설정
default_args = {
    'owner': 'data-team',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}


# ========================================================
# TriggerDagRunOperator
# ========================================================

def process_data(**context):
    """간단한 데이터 처리"""
    logging.info("데이터 처리 시작")

    query = "SELECT COUNT(*) FROM stock_prices"
    result = execute_query(query)
    count = result[0][0] if result else 0

    logging.info(f"처리된 레코드 수 : ${count}")
    return {"record_count": count}

trigger_dag = DAG(
    'trigger_example_dag',
    default_args=default_args,
    description='TriggerDagRunOperator 예제',
    schedule='@daily',
    catchup=False,
    tags=['cross-dag', 'trigger']
)

process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=trigger_dag
)

#다른 DAG를 트리거 (가장 기본적인 사용법)
trigger_next_dag = TriggerDagRunOperator(
    task_id='trigger_sensor_dag',
    trigger_dag_id='sensor_example_dag',
    wait_for_completion=False,   # 완료를 기다리지 않음
    dag=trigger_dag
)

process_task >> trigger_next_dag

# ========================================================
# ExternalTaskSensor
# ========================================================

# 두 번째 DAG: 외부 태스크 대기
def analyze_triggered_data(**context):
    """트리거된 후 분석 작업"""
    logging.info("외부 태스크 완료 후 분석 시작")

    # 간단한분석로직
    analysis_result = {"status": "completed", "timestamp": str(datetime.now())}
    logging.info(f"분석 완료: {analysis_result}")
    return analysis_result

sensor_dag = DAG(
    'sensor_example_dag',
    default_args=default_args,
    description='ExternalTaskSensor 예제',
    schedule=None,     # 트리거로만 실행됨
    catchup=False,
    tags=['cross-dag', 'sensor']
)

# 외부 DAG의 태스크 완료를 기다림
def get_most_recent_dag_run(dt):
    """가장 최근의 DAG 실행을 찾는 함수"""
    from airflow.models import DagRun
    from airflow.utils.session import provide_session

    @provide_session
    def get_recent_run(session=None):
        recent_run = session.query(DagRun).filter(
            DagRun.dag_id == 'trigger_example_dag',
            DagRun.state == 'success'
        ).order_by(DagRun.execution_date.desc()).first()

        if recent_run:
            return recent_run.execution_date
        return dt

    return get_recent_run()

wait_for_external_task = ExternalTaskSensor(
    task_id='wait_for_process_data',
    external_dag_id='trigger_example_dag',   # 기다릴 DAG
    external_task_id='process_data',         # 기다릴 태스크
    execution_date_fn=get_most_recent_dag_run, #최근성공한 실행 DAG
    timeout=60,             # 1분 타임아웃
    poke_interval=5,        # 5초마다 확인
    allowed_states=['success'], #성공 상태만 기다림
    dag=sensor_dag
)

analyze_task = PythonOperator(
    task_id='analyze_triggered_data',
    python_callable=analyze_triggered_data,
    dag=sensor_dag
)

wait_for_external_task >> analyze_task

# ========================================================
# Dataset Denpendencies
# ========================================================

# Dataset 정의 (Dataset Dependencies 용)
market_data_dataset = Dataset("market_data")
analysis_dataset = Dataset("analysis_result")

def create_market_data(**context):
    """마켓 데이터를 생성하고 Dataset을 업데이트"""

    logging.info("마켓 데이터 생성 시작")

    # 실제 데이터 생성 로직
    query = """
    SELECT symbol, AVG(price) as avg_price
    FROM stock_prices
    WHERE trade_date = CURDATE() - INTERVAL 1 DAY
    GROUP BY symbol
    LIMIT 3
    """

    result = execute_query(query)
    market_data = []

    if result:
        for row in result:
            market_data.append({"symbol": row[0], "avg_price": float(row[1])})

    logging.info(f"생성된 마켓 데이터: {market_data}")
    return market_data

dataset_producer_dag = DAG(
    'dataset_producer_dag',
    default_args=default_args,
    description='Dataset Dependencies - 생산자',
    schedule='@daily',
    catchup=False,
    tags=['cross-dag', 'dataset', 'producer']
)

create_data_task = PythonOperator(
    task_id='create_market_data',
    python_callable=create_market_data,
    outlets=[market_data_dataset],  # Dataset 업데이트 알림
    dag=dataset_producer_dag
)


# Consumer

def consume_market_data(**context):
    """생성된 마켓 데이터를 소비하여 분석"""
    logging.info("마켓 데이터 소비 및 분석 시작")

    # Dataset이 업데이트되면 자동으로 실행됨
    query = "SELECT COUNT(DISTINCT symbol) FROM stock_prices"
    result = execute_query(query)
    symbol_count = result[0][0] if result else 0

    analysis = {
        "total_symbols": symbol_count,
        "analysis_time": str(datetime.now()),
        "status": "Dataset 기반 분석 완료"
    }

    logging.info(f"Dataset 기반 분석 결과: {analysis}")
    return analysis


def generate_final_report(**context):
    """최종 리포트 생성"""
    logging.info("최종 리포트 생성")

    ti = context['ti']
    analysis = ti.xcom_pull(task_ids='consume_market_data')

    report = f"Dataset 기반 리포트: {analysis.get('total_symbols', 0)}개 심볼 분석 완료"
    logging.info(report)
    return report

dataset_consumer_dag = DAG(
    'dataset_consumer_dag',
    default_args=default_args,
    description='Dataset Dependencies - 소비자',
    schedule=[market_data_dataset],  # Dataset이 업데이트되면 실행
    catchup=False,
    tags=['cross-dag', 'dataset', 'consumer']
)

consume_task = PythonOperator(
    task_id='consume_market_data',
    python_callable=consume_market_data,
    outlets=[analysis_dataset],  # 다음 Dataset 업데이트
    dag=dataset_consumer_dag
)

report_task = PythonOperator(
    task_id='generate_final_report',
    python_callable=generate_final_report,
    dag=dataset_consumer_dag
)

consume_task >> report_task

 

 

답변 6

0

dellahong님의 프로필 이미지
dellahong
질문자

슬랙 실습

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
import json


def send_error_to_slack(context):
    """
    에러 발생 시 Slack으로 메시지 전송
    """

    # 에러 정보 추출
    task_instance = context.get('task_instance')
    dag_id = context.get('dag').dag_id
    task_id = task_instance.task_id
    execution_date = context.get('execution_date')
    exception = context.get('exception')

    # Slack 메시지 구성
    slack_message = {
        "text": "❌ Airflow Task Failed",
        "attachments": [
            {
                "color": "danger",
                "fields": [
                    {
                        "title": "DAG",
                        "value": dag_id,
                        "short": True
                    },
                    {
                        "title": "Task",
                        "value": task_id,
                        "short": True
                    },
                    {
                        "title": "Time",
                        "value": str(execution_date),
                        "short": True
                    },
                    {
                        "title": "Error Message",
                        "value": str(exception) if exception else "Unknown error",
                        "short": False
                    }
                ]
            }
        ]
    }

    # Slack으로 전송
    try:
        slack_hook = SlackWebhookHook(
            slack_webhook_conn_id='slack_default'
        )
        slack_hook.send_dict(slack_message)
        print("Error notification sent to Slack")
    except Exception as e:
        print(f"Failed to send Slack notification: {e}")


# 테스트용 함수들
def task_that_fails():
    """실패하는 태스크"""
    raise Exception("This task intentionally fails for testing")


def task_that_succeeds():
    """성공하는 태스크 (알림 없음)"""
    print("Task completed successfully")
    return "success"


# DAG 설정
default_args = {
    'owner': 'data-team',
    'start_date': datetime(2024, 1, 1),
    'retries': 0,  # 재시도 없음
    'on_failure_callback': send_error_to_slack,  # 에러 시에만 알림
}

dag = DAG(
    'slack_error_notification',
    default_args=default_args,
    description='에러 발생 시 Slack 알림',
    schedule_interval='@daily',
    catchup=False
)

# 태스크 정의
fail_task = PythonOperator(
    task_id='failing_task',
    python_callable=task_that_fails,
    dag=dag
)

success_task = PythonOperator(
    task_id='success_task',
    python_callable=task_that_succeeds,
    dag=dag
)

# 태스크 실행 순서
success_task >> fail_task


0

Hong님의 프로필 이미지
Hong
지식공유자

혹시 dellahong님 어떤 root.py가 보이지 않는다고 하시는지 공유 가능하실까요??

dellahong님의 프로필 이미지
dellahong
질문자

섹션7에서 실습하신 소스코드가 제공해주신자료에 없습니다~

0

dellahong님의 프로필 이미지
dellahong
질문자

Parallel

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
import logging
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

default_args = {
    'owner': 'data-team',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

def generate_data(total_records=500):
    symbols = ['AAPL', 'GOOGL', 'MSFT']
    data = []
    for i in range(total_records):
        data.append({
            'id': i + 1,
            'symbol': random.choice(symbols),
            'price': round(random.uniform(100, 300), 2),
            'volume': random.randint(1000, 5000)
        })
    logging.info(f"데이터 생성: {len(data)}건")
    return data


def create_batches(data, batch_size):
    batches = []
    for i in range(0, len(data), batch_size):
        batches.append(data[i:i + batch_size])
    logging.info(f"배치 생성: {len(batches)}개")
    return batches

def process_batch(batch_data, batch_id):
    time.sleep(1) #일련의 데이터 작업이 보통 들어감 
    total_volume = sum(record['volume'] for record in batch_data)
    result = {
        'batch_id': batch_id,
        'count': len(batch_data),
        'total_volume': total_volume
    }
    logging.info(f"배치 {batch_id} 완료")
    return result

def parallel_processing():
    start_time = time.time()
    data = generate_data(500)
    batches = create_batches(data, 50)

    results = []
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = {
            executor.submit(process_batch, batch, f"batch_{i+1}"): i
            for i, batch in enumerate(batches)
        }

    for future in as_completed(futures):
        result = future.result()
        results.append(result)

    processing_time = round(time.time() - start_time, 2)
    logging.info(f"병렬 처리 완료: {processing_time}초, 총 {len(results)}개 배치")
    return {
        'time': processing_time,
        'batches': len(batches),
        'results': results
    }

parallel_dag = DAG(
    'parallel_processing_dag',
    default_args=default_args,
    description='병렬 처리 예제',
    schedule='@daily',
    catchup=False,
    tags=['parallel']
)

start = DummyOperator(task_id='start', dag=parallel_dag)

parallel_task = PythonOperator(
    task_id='parallel_task',
    python_callable=parallel_processing,
    dag=parallel_dag
)

end = DummyOperator(task_id='end', dag=parallel_dag)

start >> parallel_task >> end

0

dellahong님의 프로필 이미지
dellahong
질문자

Custom Operator

from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.decorators import apply_defaults
from db_utils import execute_query
import logging

#기본 설정
default_args = {
    'owner': 'data-team',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

custom_operator_dag = DAG(
    'custom_operator_dag',
    default_args=default_args,
    description='Custom Operator 간단예제',
    schedule='@daily',
    catchup=False,
    tags=['custom-operator', 'simple']
)

start = DummyOperator(task_id='start', dag=custom_operator_dag)

# ================= Custom Operator =================

class StockDataOperator(BaseOperator):
    """주식 데이터를 조회하는 커스텀 Operator"""

    @apply_defaults
    def __init__(self, symbol, days=7, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.symbol = symbol
        self.days = days

    def execute(self, context):
        """실제 실행되는 메소드"""
        logging.info(f"{self.symbol} 주식 데이터 조회 시작")

        query = """
        SELECT symbol, COUNT(*) as count, AVG(price) as avg_price
        FROM stock_prices
        WHERE symbol = %s
        AND trade_date >= CURDATE() - INTERVAL %s DAY
        GROUP BY symbol
        """

        result = execute_query(query, [self.symbol, self.days])

        if result:
            data = {
                'symbol': result[0][0],
                'count': result[0][1],
                'avg_price': float(result[0][2]) if result[0][2] else 0.0
            }
            logging.info(f"{self.symbol} 조회완료 {data}")
            return data
        else:
            logging.warning(f"{self.symbol} 데이터가 없습니다")
            return {'symbol': self.symbol, 'count': 0, 'avg_price': 0.0}

#Custom Operator 사용
custom_task = StockDataOperator(
    task_id='extract_stock_data',
    symbol='APPL',
    days=30,
    dag=custom_operator_dag
)

# ================= 기존 Python Operator =================
 
 
# 기존 PythonOperator와 비교
def traditional_function(**context):
    """기존 방식의 함수"""
    logging.info("기존 PythonOperator 방식")

    query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = 'AAPL'"
    result = execute_query(query)
    count = result[0][0] if result else 0

    return {'symbol': 'AAPL', 'count': count}

traditional_task = PythonOperator(
    task_id='traditional_approach',
    python_callable=traditional_function,
    dag=custom_operator_dag
)

end = DummyOperator(task_id='end', dag=custom_operator_dag)

#Task 의존성
start >> [custom_task, traditional_task] >> end
    

0

dellahong님의 프로필 이미지
dellahong
질문자

 

TaskGroup

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
from db_utils import execute_query
import logging

#기본 설정
default_args = {
    'owner': 'data-team',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

def extract_data(symbol, **context):
    """데이터 추출"""
    logging.info(f"{symbol} 데이터 추출")

    query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = %s"
    result = execute_query(query, [symbol])

    count = result[0][0] if result else 0
    logging.info(f"{symbol}: {count}건 추출")
    return {'symbol': symbol, 'count': count}


def process_data(symbol, **context):
    """데이터 처리"""
    logging.info(f"{symbol} 데이터 처리")

    ti = context['ti']  # Task Instance (TI)
    extracted = ti.xcom_pull(task_ids=f'extract_group.extract_{symbol.lower()}')

    processed = {
        'symbol': symbol,
        'count': extracted['count'],
        'status': 'processed' if extracted['count'] > 0 else 'empty'
    }

    logging.info(f"{symbol} 처리 완료: {processed}")
    return processed

def generate_report(**context):
    """전체 리포트 생성"""
    logging.info("리포트 생성 시작")

    ti = context['ti']
    symbols = ['AAPL', 'GOOGL']

    total_count = 0
    for symbol in symbols:
        processed = ti.xcom_pull(task_ids=f'process_group.nested_group.process_{symbol.lower()}')
        if processed:
            total_count += processed['count']

    report = f"총 {total_count}건 데이터 처리 완료"
    logging.info(report)
    return report

# TaskGroup DAG
taskgroup_dag = DAG(
    'taskgroup_simple_dag',
    default_args=default_args,
    description='TaskGroup 간단 예제',
    schedule='@daily',
    catchup=False,
    tags=['taskgroup', 'simple']
)

start = DummyOperator(task_id='start', dag=taskgroup_dag)

# 기본 Task Group
with TaskGroup(group_id='extract_group', dag=taskgroup_dag) as extract_group:

    symbols = ['AAPL', 'GOOGL']

    for symbol in symbols:
        PythonOperator(
            task_id=f'extract_{symbol.lower()}',
            python_callable=extract_data,
            op_kwargs={'symbol': symbol},
            dag=taskgroup_dag
        )

with TaskGroup(group_id='process_group', dag=taskgroup_dag) as process_group:

    # 중첩된 TaskGroup
    with TaskGroup(group_id='nested_group', dag=taskgroup_dag) as nested_group:

        for symbol in symbols:
            PythonOperator(
                task_id=f'process_{symbol.lower()}',
                python_callable=process_data,
                op_kwargs={'symbol': symbol},
                dag=taskgroup_dag
            )

    # 리포트 태스크
    report_task = PythonOperator(
        task_id='generate_report',
        python_callable=generate_report,
        dag=taskgroup_dag
    )

    # 중첩 그룹 완료 후 리포트 실행
    nested_group >> report_task

end = DummyOperator(task_id='end', dag=taskgroup_dag)

start >> extract_group >> process_group >> end

 

0

Hong님의 프로필 이미지
Hong
지식공유자

잠시 확인하고 말씀드리겠습니다. 감사합니다!

dellahong님의 프로필 이미지
dellahong

작성한 질문수

질문하기