inflearn logo
강의

Course

Instructor

Designing Large-Scale Data Processing Patterns Based on Data Workflow Management with Toss Developers

Dynamic DAG Generation Patterns

강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요

Resolved

118

dellahong

1 asked

0

 dynamicDag

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from db_utils import execute_query

import logging

def log_task_start(message):
    logging.info(f"[START] {message}")

def log_task_end(message):
    logging.info(f"[END] {message}")

# 분석할 주식 종목 리스트 (설정으로 관리)
STOCK_SYMBOLS = ['AAPL','GOOGL','MSFT','AMZN','TSLA']

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def analyze_stock_data(symbol, **context):
    """주식 종목에 대해서, 최근 30일간의 주식 데이터를 분석"""
    log_task_start(f"주식 분석 - {symbol}")

    query = """
    SELECT
        symbol,
        AVG(price) as avg_price,
        MAX(price) as max_price,
        MIN(price) as min_price,
        SUM(volume) as total_volume,
        COUNT(*) as trading_days
    FROM stock_prices
    WHERE symbol = %s
    AND trade_date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY)
    GROUP BY symbol
    """

    result = execute_query(query, [symbol])

    if result:
        data = result[0]
        print(f"{symbol} 분석 결과:")
        print(f"  평균 가격: ${data[1]:.2f}")
        print(f"  최고 가격: ${data[2]:.2f}")
        print(f"  최저 가격: ${data[3]:.2f}")
        print(f"  총 거래량: {data[4]:,}")
        print(f"  거래일수: {data[5]}일")

    log_task_end(f"주식 분석 - {symbol}")
    return result

def create_stock_dag(symbol):
    """개별 주식에 대한 DAG 생성 함수"""

    dag_id = f'stock_analysis_{symbol.lower()}'

    dag = DAG(
        dag_id,
        default_args=default_args,
        description=f'{symbol} 주식 데이터 분석 DAG',
        schedule='@daily',
        catchup=False,
        tags=['dynamic', 'stock', 'analysis', symbol]
    )

    # 데이터 품질 체크 태스크
    def check_data_quality(**context):
        query = """
        SELECT COUNT(*) as count
        FROM stock_prices
        WHERE symbol = %s
        AND trade_date = CURDATE() - INTERVAL 1 DAY
        """
        result = execute_query(query, [symbol])
        logging.info(f"{symbol} 데이터 품질 체크: {result[0][0] if result else 0}")
        return result

    data_quality_check = PythonOperator(
        task_id=f'check_data_quality_{symbol.lower()}',
        python_callable=check_data_quality,
        dag=dag
    )

    # 주식 분석 태스크
    analyze_task = PythonOperator(
        task_id=f'analyze_{symbol.lower()}',
        python_callable=analyze_stock_data,
        op_kwargs={'symbol': symbol},
        dag=dag
    )

    # 결과 저장 태스크
    def save_results(**context):
        query = """
        INSERT INTO daily_stats (symbol, summary_date, avg_price, total_volume, trade_count)
        SELECT 
            %s,
            CURDATE() - INTERVAL 1 DAY,
            AVG(price),
            SUM(volume),
            COUNT(*)
        FROM stock_prices
        WHERE symbol = %s
        AND trade_date = CURDATE() - INTERVAL 1 DAY
        ON DUPLICATE KEY UPDATE
            avg_price = VALUES(avg_price),
            total_volume = VALUES(total_volume),
            trade_count = VALUES(trade_count)
        """        
        result = execute_query(query, [symbol, symbol])
        logging.info(f"{symbol} 분석 결과 저장 완료")
        return result

    save_results = PythonOperator(
        task_id=f'save_analysis_{symbol.lower()}',
        python_callable=save_results,
        dag=dag
    )

    # 태스크 의존성 설정
    data_quality_check >> analyze_task >> save_results

    return dag

# Dynamic DAG 생성
# 각 주식 종목에 대해 별도의 DAG 생성
for symbol in STOCK_SYMBOLS:
    dag_id = f'stock_analysis_{symbol.lower()}'
    globals()[dag_id] = create_stock_dag(symbol)

CrossDag (트리거 센서)

from datetime import datetime, timedelta
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from db_utils import execute_query
import logging

#기본 설정
default_args = {
    'owner': 'data-team',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}


# ========================================================
# TriggerDagRunOperator
# ========================================================

def process_data(**context):
    """간단한 데이터 처리"""
    logging.info("데이터 처리 시작")

    query = "SELECT COUNT(*) FROM stock_prices"
    result = execute_query(query)
    count = result[0][0] if result else 0

    logging.info(f"처리된 레코드 수 : ${count}")
    return {"record_count": count}

trigger_dag = DAG(
    'trigger_example_dag',
    default_args=default_args,
    description='TriggerDagRunOperator 예제',
    schedule='@daily',
    catchup=False,
    tags=['cross-dag', 'trigger']
)

process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=trigger_dag
)

#다른 DAG를 트리거 (가장 기본적인 사용법)
trigger_next_dag = TriggerDagRunOperator(
    task_id='trigger_sensor_dag',
    trigger_dag_id='sensor_example_dag',
    wait_for_completion=False,   # 완료를 기다리지 않음
    dag=trigger_dag
)

process_task >> trigger_next_dag

# ========================================================
# ExternalTaskSensor
# ========================================================

# 두 번째 DAG: 외부 태스크 대기
def analyze_triggered_data(**context):
    """트리거된 후 분석 작업"""
    logging.info("외부 태스크 완료 후 분석 시작")

    # 간단한분석로직
    analysis_result = {"status": "completed", "timestamp": str(datetime.now())}
    logging.info(f"분석 완료: {analysis_result}")
    return analysis_result

sensor_dag = DAG(
    'sensor_example_dag',
    default_args=default_args,
    description='ExternalTaskSensor 예제',
    schedule=None,     # 트리거로만 실행됨
    catchup=False,
    tags=['cross-dag', 'sensor']
)

# 외부 DAG의 태스크 완료를 기다림
def get_most_recent_dag_run(dt):
    """가장 최근의 DAG 실행을 찾는 함수"""
    from airflow.models import DagRun
    from airflow.utils.session import provide_session

    @provide_session
    def get_recent_run(session=None):
        recent_run = session.query(DagRun).filter(
            DagRun.dag_id == 'trigger_example_dag',
            DagRun.state == 'success'
        ).order_by(DagRun.execution_date.desc()).first()

        if recent_run:
            return recent_run.execution_date
        return dt

    return get_recent_run()

wait_for_external_task = ExternalTaskSensor(
    task_id='wait_for_process_data',
    external_dag_id='trigger_example_dag',   # 기다릴 DAG
    external_task_id='process_data',         # 기다릴 태스크
    execution_date_fn=get_most_recent_dag_run, #최근성공한 실행 DAG
    timeout=60,             # 1분 타임아웃
    poke_interval=5,        # 5초마다 확인
    allowed_states=['success'], #성공 상태만 기다림
    dag=sensor_dag
)

analyze_task = PythonOperator(
    task_id='analyze_triggered_data',
    python_callable=analyze_triggered_data,
    dag=sensor_dag
)

wait_for_external_task >> analyze_task

# ========================================================
# Dataset Denpendencies
# ========================================================

# Dataset 정의 (Dataset Dependencies 용)
market_data_dataset = Dataset("market_data")
analysis_dataset = Dataset("analysis_result")

def create_market_data(**context):
    """마켓 데이터를 생성하고 Dataset을 업데이트"""

    logging.info("마켓 데이터 생성 시작")

    # 실제 데이터 생성 로직
    query = """
    SELECT symbol, AVG(price) as avg_price
    FROM stock_prices
    WHERE trade_date = CURDATE() - INTERVAL 1 DAY
    GROUP BY symbol
    LIMIT 3
    """

    result = execute_query(query)
    market_data = []

    if result:
        for row in result:
            market_data.append({"symbol": row[0], "avg_price": float(row[1])})

    logging.info(f"생성된 마켓 데이터: {market_data}")
    return market_data

dataset_producer_dag = DAG(
    'dataset_producer_dag',
    default_args=default_args,
    description='Dataset Dependencies - 생산자',
    schedule='@daily',
    catchup=False,
    tags=['cross-dag', 'dataset', 'producer']
)

create_data_task = PythonOperator(
    task_id='create_market_data',
    python_callable=create_market_data,
    outlets=[market_data_dataset],  # Dataset 업데이트 알림
    dag=dataset_producer_dag
)


# Consumer

def consume_market_data(**context):
    """생성된 마켓 데이터를 소비하여 분석"""
    logging.info("마켓 데이터 소비 및 분석 시작")

    # Dataset이 업데이트되면 자동으로 실행됨
    query = "SELECT COUNT(DISTINCT symbol) FROM stock_prices"
    result = execute_query(query)
    symbol_count = result[0][0] if result else 0

    analysis = {
        "total_symbols": symbol_count,
        "analysis_time": str(datetime.now()),
        "status": "Dataset 기반 분석 완료"
    }

    logging.info(f"Dataset 기반 분석 결과: {analysis}")
    return analysis


def generate_final_report(**context):
    """최종 리포트 생성"""
    logging.info("최종 리포트 생성")

    ti = context['ti']
    analysis = ti.xcom_pull(task_ids='consume_market_data')

    report = f"Dataset 기반 리포트: {analysis.get('total_symbols', 0)}개 심볼 분석 완료"
    logging.info(report)
    return report

dataset_consumer_dag = DAG(
    'dataset_consumer_dag',
    default_args=default_args,
    description='Dataset Dependencies - 소비자',
    schedule=[market_data_dataset],  # Dataset이 업데이트되면 실행
    catchup=False,
    tags=['cross-dag', 'dataset', 'consumer']
)

consume_task = PythonOperator(
    task_id='consume_market_data',
    python_callable=consume_market_data,
    outlets=[analysis_dataset],  # 다음 Dataset 업데이트
    dag=dataset_consumer_dag
)

report_task = PythonOperator(
    task_id='generate_final_report',
    python_callable=generate_final_report,
    dag=dataset_consumer_dag
)

consume_task >> report_task

 

 

빅데이터 docker docker-compose airflow

Answer 7

1

dellahong

압축파일을 윈도우기본 압축 프로그램으로 풀었더니 실습코드 폴더가 안나오는 현상이 있었습니다! 실습코드가 없어진줄 알있는데 실습파일 다시 잘 확인했습니다~

0

dellahong

슬랙 실습

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
import json


def send_error_to_slack(context):
    """
    에러 발생 시 Slack으로 메시지 전송
    """

    # 에러 정보 추출
    task_instance = context.get('task_instance')
    dag_id = context.get('dag').dag_id
    task_id = task_instance.task_id
    execution_date = context.get('execution_date')
    exception = context.get('exception')

    # Slack 메시지 구성
    slack_message = {
        "text": "❌ Airflow Task Failed",
        "attachments": [
            {
                "color": "danger",
                "fields": [
                    {
                        "title": "DAG",
                        "value": dag_id,
                        "short": True
                    },
                    {
                        "title": "Task",
                        "value": task_id,
                        "short": True
                    },
                    {
                        "title": "Time",
                        "value": str(execution_date),
                        "short": True
                    },
                    {
                        "title": "Error Message",
                        "value": str(exception) if exception else "Unknown error",
                        "short": False
                    }
                ]
            }
        ]
    }

    # Slack으로 전송
    try:
        slack_hook = SlackWebhookHook(
            slack_webhook_conn_id='slack_default'
        )
        slack_hook.send_dict(slack_message)
        print("Error notification sent to Slack")
    except Exception as e:
        print(f"Failed to send Slack notification: {e}")


# 테스트용 함수들
def task_that_fails():
    """실패하는 태스크"""
    raise Exception("This task intentionally fails for testing")


def task_that_succeeds():
    """성공하는 태스크 (알림 없음)"""
    print("Task completed successfully")
    return "success"


# DAG 설정
default_args = {
    'owner': 'data-team',
    'start_date': datetime(2024, 1, 1),
    'retries': 0,  # 재시도 없음
    'on_failure_callback': send_error_to_slack,  # 에러 시에만 알림
}

dag = DAG(
    'slack_error_notification',
    default_args=default_args,
    description='에러 발생 시 Slack 알림',
    schedule_interval='@daily',
    catchup=False
)

# 태스크 정의
fail_task = PythonOperator(
    task_id='failing_task',
    python_callable=task_that_fails,
    dag=dag
)

success_task = PythonOperator(
    task_id='success_task',
    python_callable=task_that_succeeds,
    dag=dag
)

# 태스크 실행 순서
success_task >> fail_task


0

Hong

혹시 dellahong님 어떤 root.py가 보이지 않는다고 하시는지 공유 가능하실까요??

0

dellahong

섹션7에서 실습하신 소스코드가 제공해주신자료에 없습니다~

0

dellahong

Parallel

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
import logging
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

default_args = {
    'owner': 'data-team',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

def generate_data(total_records=500):
    symbols = ['AAPL', 'GOOGL', 'MSFT']
    data = []
    for i in range(total_records):
        data.append({
            'id': i + 1,
            'symbol': random.choice(symbols),
            'price': round(random.uniform(100, 300), 2),
            'volume': random.randint(1000, 5000)
        })
    logging.info(f"데이터 생성: {len(data)}건")
    return data


def create_batches(data, batch_size):
    batches = []
    for i in range(0, len(data), batch_size):
        batches.append(data[i:i + batch_size])
    logging.info(f"배치 생성: {len(batches)}개")
    return batches

def process_batch(batch_data, batch_id):
    time.sleep(1) #일련의 데이터 작업이 보통 들어감 
    total_volume = sum(record['volume'] for record in batch_data)
    result = {
        'batch_id': batch_id,
        'count': len(batch_data),
        'total_volume': total_volume
    }
    logging.info(f"배치 {batch_id} 완료")
    return result

def parallel_processing():
    start_time = time.time()
    data = generate_data(500)
    batches = create_batches(data, 50)

    results = []
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = {
            executor.submit(process_batch, batch, f"batch_{i+1}"): i
            for i, batch in enumerate(batches)
        }

    for future in as_completed(futures):
        result = future.result()
        results.append(result)

    processing_time = round(time.time() - start_time, 2)
    logging.info(f"병렬 처리 완료: {processing_time}초, 총 {len(results)}개 배치")
    return {
        'time': processing_time,
        'batches': len(batches),
        'results': results
    }

parallel_dag = DAG(
    'parallel_processing_dag',
    default_args=default_args,
    description='병렬 처리 예제',
    schedule='@daily',
    catchup=False,
    tags=['parallel']
)

start = DummyOperator(task_id='start', dag=parallel_dag)

parallel_task = PythonOperator(
    task_id='parallel_task',
    python_callable=parallel_processing,
    dag=parallel_dag
)

end = DummyOperator(task_id='end', dag=parallel_dag)

start >> parallel_task >> end

0

dellahong

Custom Operator

from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.decorators import apply_defaults
from db_utils import execute_query
import logging

#기본 설정
default_args = {
    'owner': 'data-team',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

custom_operator_dag = DAG(
    'custom_operator_dag',
    default_args=default_args,
    description='Custom Operator 간단예제',
    schedule='@daily',
    catchup=False,
    tags=['custom-operator', 'simple']
)

start = DummyOperator(task_id='start', dag=custom_operator_dag)

# ================= Custom Operator =================

class StockDataOperator(BaseOperator):
    """주식 데이터를 조회하는 커스텀 Operator"""

    @apply_defaults
    def __init__(self, symbol, days=7, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.symbol = symbol
        self.days = days

    def execute(self, context):
        """실제 실행되는 메소드"""
        logging.info(f"{self.symbol} 주식 데이터 조회 시작")

        query = """
        SELECT symbol, COUNT(*) as count, AVG(price) as avg_price
        FROM stock_prices
        WHERE symbol = %s
        AND trade_date >= CURDATE() - INTERVAL %s DAY
        GROUP BY symbol
        """

        result = execute_query(query, [self.symbol, self.days])

        if result:
            data = {
                'symbol': result[0][0],
                'count': result[0][1],
                'avg_price': float(result[0][2]) if result[0][2] else 0.0
            }
            logging.info(f"{self.symbol} 조회완료 {data}")
            return data
        else:
            logging.warning(f"{self.symbol} 데이터가 없습니다")
            return {'symbol': self.symbol, 'count': 0, 'avg_price': 0.0}

#Custom Operator 사용
custom_task = StockDataOperator(
    task_id='extract_stock_data',
    symbol='APPL',
    days=30,
    dag=custom_operator_dag
)

# ================= 기존 Python Operator =================
 
 
# 기존 PythonOperator와 비교
def traditional_function(**context):
    """기존 방식의 함수"""
    logging.info("기존 PythonOperator 방식")

    query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = 'AAPL'"
    result = execute_query(query)
    count = result[0][0] if result else 0

    return {'symbol': 'AAPL', 'count': count}

traditional_task = PythonOperator(
    task_id='traditional_approach',
    python_callable=traditional_function,
    dag=custom_operator_dag
)

end = DummyOperator(task_id='end', dag=custom_operator_dag)

#Task 의존성
start >> [custom_task, traditional_task] >> end
    

0

dellahong

 

TaskGroup

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
from db_utils import execute_query
import logging

#기본 설정
default_args = {
    'owner': 'data-team',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

def extract_data(symbol, **context):
    """데이터 추출"""
    logging.info(f"{symbol} 데이터 추출")

    query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = %s"
    result = execute_query(query, [symbol])

    count = result[0][0] if result else 0
    logging.info(f"{symbol}: {count}건 추출")
    return {'symbol': symbol, 'count': count}


def process_data(symbol, **context):
    """데이터 처리"""
    logging.info(f"{symbol} 데이터 처리")

    ti = context['ti']  # Task Instance (TI)
    extracted = ti.xcom_pull(task_ids=f'extract_group.extract_{symbol.lower()}')

    processed = {
        'symbol': symbol,
        'count': extracted['count'],
        'status': 'processed' if extracted['count'] > 0 else 'empty'
    }

    logging.info(f"{symbol} 처리 완료: {processed}")
    return processed

def generate_report(**context):
    """전체 리포트 생성"""
    logging.info("리포트 생성 시작")

    ti = context['ti']
    symbols = ['AAPL', 'GOOGL']

    total_count = 0
    for symbol in symbols:
        processed = ti.xcom_pull(task_ids=f'process_group.nested_group.process_{symbol.lower()}')
        if processed:
            total_count += processed['count']

    report = f"총 {total_count}건 데이터 처리 완료"
    logging.info(report)
    return report

# TaskGroup DAG
taskgroup_dag = DAG(
    'taskgroup_simple_dag',
    default_args=default_args,
    description='TaskGroup 간단 예제',
    schedule='@daily',
    catchup=False,
    tags=['taskgroup', 'simple']
)

start = DummyOperator(task_id='start', dag=taskgroup_dag)

# 기본 Task Group
with TaskGroup(group_id='extract_group', dag=taskgroup_dag) as extract_group:

    symbols = ['AAPL', 'GOOGL']

    for symbol in symbols:
        PythonOperator(
            task_id=f'extract_{symbol.lower()}',
            python_callable=extract_data,
            op_kwargs={'symbol': symbol},
            dag=taskgroup_dag
        )

with TaskGroup(group_id='process_group', dag=taskgroup_dag) as process_group:

    # 중첩된 TaskGroup
    with TaskGroup(group_id='nested_group', dag=taskgroup_dag) as nested_group:

        for symbol in symbols:
            PythonOperator(
                task_id=f'process_{symbol.lower()}',
                python_callable=process_data,
                op_kwargs={'symbol': symbol},
                dag=taskgroup_dag
            )

    # 리포트 태스크
    report_task = PythonOperator(
        task_id='generate_report',
        python_callable=generate_report,
        dag=taskgroup_dag
    )

    # 중첩 그룹 완료 후 리포트 실행
    nested_group >> report_task

end = DummyOperator(task_id='end', dag=taskgroup_dag)

start >> extract_group >> process_group >> end

 

0

Hong

잠시 확인하고 말씀드리겠습니다. 감사합니다!

docker compose에 대해 질문드립니다.

0

2

0

작업형 1 유형 부분

0

3

1

작업형 1 (삭제예정, 구 버전)

0

27

2

수강기간 연장 문의드립니다.

0

19

1

2유형 레이블 인코딩 VS 원핫 인코딩

0

19

3

수강기간 연장 문의드립니다.

0

26

1

인덱스 슬라이싱

0

26

2

astro dev start - python 라이브러리 설치 fail

0

21

1

맥북 환경구성 에러

0

19

2

Free Edition 실습 영상은 아직 업데이트전인가요?

0

22

1

질문 드립니다.

0

45

2

강의 내용 관련 질문드립니다~

0

43

2

수강 연장 문의

0

54

2

강의자료 일괄 다운로드

0

49

2

list 문제 질문드립니다~

0

34

2

빅분기 실기 12회 재도전

0

52

2

강의 기간 연장 가능여부 검토 요청건

0

38

2

수강기간 연장 문의 드립니다

0

43

2

JPA Repository 질문이 있습니다!

1

33

2

페이지네이션 처리를 쿼리에서 하는 방식 질문

1

36

1

수강기간 연장 문의드립니다

0

50

2

디바이스 페어링 문의

0

32

2

질문이요

0

55

2

실습환경에 대해 질문이 있습니다!

0

65

2