인프런 커뮤니티 질문&답변
강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요
작성
·
42
·
수정됨
0
dynamicDag
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from db_utils import execute_query
import logging
def log_task_start(message):
logging.info(f"[START] {message}")
def log_task_end(message):
logging.info(f"[END] {message}")
# 분석할 주식 종목 리스트 (설정으로 관리)
STOCK_SYMBOLS = ['AAPL','GOOGL','MSFT','AMZN','TSLA']
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def analyze_stock_data(symbol, **context):
"""주식 종목에 대해서, 최근 30일간의 주식 데이터를 분석"""
log_task_start(f"주식 분석 - {symbol}")
query = """
SELECT
symbol,
AVG(price) as avg_price,
MAX(price) as max_price,
MIN(price) as min_price,
SUM(volume) as total_volume,
COUNT(*) as trading_days
FROM stock_prices
WHERE symbol = %s
AND trade_date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY)
GROUP BY symbol
"""
result = execute_query(query, [symbol])
if result:
data = result[0]
print(f"{symbol} 분석 결과:")
print(f" 평균 가격: ${data[1]:.2f}")
print(f" 최고 가격: ${data[2]:.2f}")
print(f" 최저 가격: ${data[3]:.2f}")
print(f" 총 거래량: {data[4]:,}")
print(f" 거래일수: {data[5]}일")
log_task_end(f"주식 분석 - {symbol}")
return result
def create_stock_dag(symbol):
"""개별 주식에 대한 DAG 생성 함수"""
dag_id = f'stock_analysis_{symbol.lower()}'
dag = DAG(
dag_id,
default_args=default_args,
description=f'{symbol} 주식 데이터 분석 DAG',
schedule='@daily',
catchup=False,
tags=['dynamic', 'stock', 'analysis', symbol]
)
# 데이터 품질 체크 태스크
def check_data_quality(**context):
query = """
SELECT COUNT(*) as count
FROM stock_prices
WHERE symbol = %s
AND trade_date = CURDATE() - INTERVAL 1 DAY
"""
result = execute_query(query, [symbol])
logging.info(f"{symbol} 데이터 품질 체크: {result[0][0] if result else 0}")
return result
data_quality_check = PythonOperator(
task_id=f'check_data_quality_{symbol.lower()}',
python_callable=check_data_quality,
dag=dag
)
# 주식 분석 태스크
analyze_task = PythonOperator(
task_id=f'analyze_{symbol.lower()}',
python_callable=analyze_stock_data,
op_kwargs={'symbol': symbol},
dag=dag
)
# 결과 저장 태스크
def save_results(**context):
query = """
INSERT INTO daily_stats (symbol, summary_date, avg_price, total_volume, trade_count)
SELECT
%s,
CURDATE() - INTERVAL 1 DAY,
AVG(price),
SUM(volume),
COUNT(*)
FROM stock_prices
WHERE symbol = %s
AND trade_date = CURDATE() - INTERVAL 1 DAY
ON DUPLICATE KEY UPDATE
avg_price = VALUES(avg_price),
total_volume = VALUES(total_volume),
trade_count = VALUES(trade_count)
"""
result = execute_query(query, [symbol, symbol])
logging.info(f"{symbol} 분석 결과 저장 완료")
return result
save_results = PythonOperator(
task_id=f'save_analysis_{symbol.lower()}',
python_callable=save_results,
dag=dag
)
# 태스크 의존성 설정
data_quality_check >> analyze_task >> save_results
return dag
# Dynamic DAG 생성
# 각 주식 종목에 대해 별도의 DAG 생성
for symbol in STOCK_SYMBOLS:
dag_id = f'stock_analysis_{symbol.lower()}'
globals()[dag_id] = create_stock_dag(symbol)CrossDag (트리거 센서)
from datetime import datetime, timedelta
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from db_utils import execute_query
import logging
#기본 설정
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
# ========================================================
# TriggerDagRunOperator
# ========================================================
def process_data(**context):
"""간단한 데이터 처리"""
logging.info("데이터 처리 시작")
query = "SELECT COUNT(*) FROM stock_prices"
result = execute_query(query)
count = result[0][0] if result else 0
logging.info(f"처리된 레코드 수 : ${count}")
return {"record_count": count}
trigger_dag = DAG(
'trigger_example_dag',
default_args=default_args,
description='TriggerDagRunOperator 예제',
schedule='@daily',
catchup=False,
tags=['cross-dag', 'trigger']
)
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=trigger_dag
)
#다른 DAG를 트리거 (가장 기본적인 사용법)
trigger_next_dag = TriggerDagRunOperator(
task_id='trigger_sensor_dag',
trigger_dag_id='sensor_example_dag',
wait_for_completion=False, # 완료를 기다리지 않음
dag=trigger_dag
)
process_task >> trigger_next_dag
# ========================================================
# ExternalTaskSensor
# ========================================================
# 두 번째 DAG: 외부 태스크 대기
def analyze_triggered_data(**context):
"""트리거된 후 분석 작업"""
logging.info("외부 태스크 완료 후 분석 시작")
# 간단한분석로직
analysis_result = {"status": "completed", "timestamp": str(datetime.now())}
logging.info(f"분석 완료: {analysis_result}")
return analysis_result
sensor_dag = DAG(
'sensor_example_dag',
default_args=default_args,
description='ExternalTaskSensor 예제',
schedule=None, # 트리거로만 실행됨
catchup=False,
tags=['cross-dag', 'sensor']
)
# 외부 DAG의 태스크 완료를 기다림
def get_most_recent_dag_run(dt):
"""가장 최근의 DAG 실행을 찾는 함수"""
from airflow.models import DagRun
from airflow.utils.session import provide_session
@provide_session
def get_recent_run(session=None):
recent_run = session.query(DagRun).filter(
DagRun.dag_id == 'trigger_example_dag',
DagRun.state == 'success'
).order_by(DagRun.execution_date.desc()).first()
if recent_run:
return recent_run.execution_date
return dt
return get_recent_run()
wait_for_external_task = ExternalTaskSensor(
task_id='wait_for_process_data',
external_dag_id='trigger_example_dag', # 기다릴 DAG
external_task_id='process_data', # 기다릴 태스크
execution_date_fn=get_most_recent_dag_run, #최근성공한 실행 DAG
timeout=60, # 1분 타임아웃
poke_interval=5, # 5초마다 확인
allowed_states=['success'], #성공 상태만 기다림
dag=sensor_dag
)
analyze_task = PythonOperator(
task_id='analyze_triggered_data',
python_callable=analyze_triggered_data,
dag=sensor_dag
)
wait_for_external_task >> analyze_task
# ========================================================
# Dataset Denpendencies
# ========================================================
# Dataset 정의 (Dataset Dependencies 용)
market_data_dataset = Dataset("market_data")
analysis_dataset = Dataset("analysis_result")
def create_market_data(**context):
"""마켓 데이터를 생성하고 Dataset을 업데이트"""
logging.info("마켓 데이터 생성 시작")
# 실제 데이터 생성 로직
query = """
SELECT symbol, AVG(price) as avg_price
FROM stock_prices
WHERE trade_date = CURDATE() - INTERVAL 1 DAY
GROUP BY symbol
LIMIT 3
"""
result = execute_query(query)
market_data = []
if result:
for row in result:
market_data.append({"symbol": row[0], "avg_price": float(row[1])})
logging.info(f"생성된 마켓 데이터: {market_data}")
return market_data
dataset_producer_dag = DAG(
'dataset_producer_dag',
default_args=default_args,
description='Dataset Dependencies - 생산자',
schedule='@daily',
catchup=False,
tags=['cross-dag', 'dataset', 'producer']
)
create_data_task = PythonOperator(
task_id='create_market_data',
python_callable=create_market_data,
outlets=[market_data_dataset], # Dataset 업데이트 알림
dag=dataset_producer_dag
)
# Consumer
def consume_market_data(**context):
"""생성된 마켓 데이터를 소비하여 분석"""
logging.info("마켓 데이터 소비 및 분석 시작")
# Dataset이 업데이트되면 자동으로 실행됨
query = "SELECT COUNT(DISTINCT symbol) FROM stock_prices"
result = execute_query(query)
symbol_count = result[0][0] if result else 0
analysis = {
"total_symbols": symbol_count,
"analysis_time": str(datetime.now()),
"status": "Dataset 기반 분석 완료"
}
logging.info(f"Dataset 기반 분석 결과: {analysis}")
return analysis
def generate_final_report(**context):
"""최종 리포트 생성"""
logging.info("최종 리포트 생성")
ti = context['ti']
analysis = ti.xcom_pull(task_ids='consume_market_data')
report = f"Dataset 기반 리포트: {analysis.get('total_symbols', 0)}개 심볼 분석 완료"
logging.info(report)
return report
dataset_consumer_dag = DAG(
'dataset_consumer_dag',
default_args=default_args,
description='Dataset Dependencies - 소비자',
schedule=[market_data_dataset], # Dataset이 업데이트되면 실행
catchup=False,
tags=['cross-dag', 'dataset', 'consumer']
)
consume_task = PythonOperator(
task_id='consume_market_data',
python_callable=consume_market_data,
outlets=[analysis_dataset], # 다음 Dataset 업데이트
dag=dataset_consumer_dag
)
report_task = PythonOperator(
task_id='generate_final_report',
python_callable=generate_final_report,
dag=dataset_consumer_dag
)
consume_task >> report_task
답변 6
0
dellahong
질문자
슬랙 실습
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
import json
def send_error_to_slack(context):
"""
에러 발생 시 Slack으로 메시지 전송
"""
# 에러 정보 추출
task_instance = context.get('task_instance')
dag_id = context.get('dag').dag_id
task_id = task_instance.task_id
execution_date = context.get('execution_date')
exception = context.get('exception')
# Slack 메시지 구성
slack_message = {
"text": "❌ Airflow Task Failed",
"attachments": [
{
"color": "danger",
"fields": [
{
"title": "DAG",
"value": dag_id,
"short": True
},
{
"title": "Task",
"value": task_id,
"short": True
},
{
"title": "Time",
"value": str(execution_date),
"short": True
},
{
"title": "Error Message",
"value": str(exception) if exception else "Unknown error",
"short": False
}
]
}
]
}
# Slack으로 전송
try:
slack_hook = SlackWebhookHook(
slack_webhook_conn_id='slack_default'
)
slack_hook.send_dict(slack_message)
print("Error notification sent to Slack")
except Exception as e:
print(f"Failed to send Slack notification: {e}")
# 테스트용 함수들
def task_that_fails():
"""실패하는 태스크"""
raise Exception("This task intentionally fails for testing")
def task_that_succeeds():
"""성공하는 태스크 (알림 없음)"""
print("Task completed successfully")
return "success"
# DAG 설정
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 0, # 재시도 없음
'on_failure_callback': send_error_to_slack, # 에러 시에만 알림
}
dag = DAG(
'slack_error_notification',
default_args=default_args,
description='에러 발생 시 Slack 알림',
schedule_interval='@daily',
catchup=False
)
# 태스크 정의
fail_task = PythonOperator(
task_id='failing_task',
python_callable=task_that_fails,
dag=dag
)
success_task = PythonOperator(
task_id='success_task',
python_callable=task_that_succeeds,
dag=dag
)
# 태스크 실행 순서
success_task >> fail_task
0
0
dellahong
질문자
Parallel
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
import logging
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
def generate_data(total_records=500):
symbols = ['AAPL', 'GOOGL', 'MSFT']
data = []
for i in range(total_records):
data.append({
'id': i + 1,
'symbol': random.choice(symbols),
'price': round(random.uniform(100, 300), 2),
'volume': random.randint(1000, 5000)
})
logging.info(f"데이터 생성: {len(data)}건")
return data
def create_batches(data, batch_size):
batches = []
for i in range(0, len(data), batch_size):
batches.append(data[i:i + batch_size])
logging.info(f"배치 생성: {len(batches)}개")
return batches
def process_batch(batch_data, batch_id):
time.sleep(1) #일련의 데이터 작업이 보통 들어감
total_volume = sum(record['volume'] for record in batch_data)
result = {
'batch_id': batch_id,
'count': len(batch_data),
'total_volume': total_volume
}
logging.info(f"배치 {batch_id} 완료")
return result
def parallel_processing():
start_time = time.time()
data = generate_data(500)
batches = create_batches(data, 50)
results = []
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(process_batch, batch, f"batch_{i+1}"): i
for i, batch in enumerate(batches)
}
for future in as_completed(futures):
result = future.result()
results.append(result)
processing_time = round(time.time() - start_time, 2)
logging.info(f"병렬 처리 완료: {processing_time}초, 총 {len(results)}개 배치")
return {
'time': processing_time,
'batches': len(batches),
'results': results
}
parallel_dag = DAG(
'parallel_processing_dag',
default_args=default_args,
description='병렬 처리 예제',
schedule='@daily',
catchup=False,
tags=['parallel']
)
start = DummyOperator(task_id='start', dag=parallel_dag)
parallel_task = PythonOperator(
task_id='parallel_task',
python_callable=parallel_processing,
dag=parallel_dag
)
end = DummyOperator(task_id='end', dag=parallel_dag)
start >> parallel_task >> end
0
dellahong
질문자
Custom Operator
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.decorators import apply_defaults
from db_utils import execute_query
import logging
#기본 설정
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
custom_operator_dag = DAG(
'custom_operator_dag',
default_args=default_args,
description='Custom Operator 간단예제',
schedule='@daily',
catchup=False,
tags=['custom-operator', 'simple']
)
start = DummyOperator(task_id='start', dag=custom_operator_dag)
# ================= Custom Operator =================
class StockDataOperator(BaseOperator):
"""주식 데이터를 조회하는 커스텀 Operator"""
@apply_defaults
def __init__(self, symbol, days=7, *args, **kwargs):
super().__init__(*args, **kwargs)
self.symbol = symbol
self.days = days
def execute(self, context):
"""실제 실행되는 메소드"""
logging.info(f"{self.symbol} 주식 데이터 조회 시작")
query = """
SELECT symbol, COUNT(*) as count, AVG(price) as avg_price
FROM stock_prices
WHERE symbol = %s
AND trade_date >= CURDATE() - INTERVAL %s DAY
GROUP BY symbol
"""
result = execute_query(query, [self.symbol, self.days])
if result:
data = {
'symbol': result[0][0],
'count': result[0][1],
'avg_price': float(result[0][2]) if result[0][2] else 0.0
}
logging.info(f"{self.symbol} 조회완료 {data}")
return data
else:
logging.warning(f"{self.symbol} 데이터가 없습니다")
return {'symbol': self.symbol, 'count': 0, 'avg_price': 0.0}
#Custom Operator 사용
custom_task = StockDataOperator(
task_id='extract_stock_data',
symbol='APPL',
days=30,
dag=custom_operator_dag
)
# ================= 기존 Python Operator =================
# 기존 PythonOperator와 비교
def traditional_function(**context):
"""기존 방식의 함수"""
logging.info("기존 PythonOperator 방식")
query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = 'AAPL'"
result = execute_query(query)
count = result[0][0] if result else 0
return {'symbol': 'AAPL', 'count': count}
traditional_task = PythonOperator(
task_id='traditional_approach',
python_callable=traditional_function,
dag=custom_operator_dag
)
end = DummyOperator(task_id='end', dag=custom_operator_dag)
#Task 의존성
start >> [custom_task, traditional_task] >> end
0
dellahong
질문자
TaskGroup
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
from db_utils import execute_query
import logging
#기본 설정
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
def extract_data(symbol, **context):
"""데이터 추출"""
logging.info(f"{symbol} 데이터 추출")
query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = %s"
result = execute_query(query, [symbol])
count = result[0][0] if result else 0
logging.info(f"{symbol}: {count}건 추출")
return {'symbol': symbol, 'count': count}
def process_data(symbol, **context):
"""데이터 처리"""
logging.info(f"{symbol} 데이터 처리")
ti = context['ti'] # Task Instance (TI)
extracted = ti.xcom_pull(task_ids=f'extract_group.extract_{symbol.lower()}')
processed = {
'symbol': symbol,
'count': extracted['count'],
'status': 'processed' if extracted['count'] > 0 else 'empty'
}
logging.info(f"{symbol} 처리 완료: {processed}")
return processed
def generate_report(**context):
"""전체 리포트 생성"""
logging.info("리포트 생성 시작")
ti = context['ti']
symbols = ['AAPL', 'GOOGL']
total_count = 0
for symbol in symbols:
processed = ti.xcom_pull(task_ids=f'process_group.nested_group.process_{symbol.lower()}')
if processed:
total_count += processed['count']
report = f"총 {total_count}건 데이터 처리 완료"
logging.info(report)
return report
# TaskGroup DAG
taskgroup_dag = DAG(
'taskgroup_simple_dag',
default_args=default_args,
description='TaskGroup 간단 예제',
schedule='@daily',
catchup=False,
tags=['taskgroup', 'simple']
)
start = DummyOperator(task_id='start', dag=taskgroup_dag)
# 기본 Task Group
with TaskGroup(group_id='extract_group', dag=taskgroup_dag) as extract_group:
symbols = ['AAPL', 'GOOGL']
for symbol in symbols:
PythonOperator(
task_id=f'extract_{symbol.lower()}',
python_callable=extract_data,
op_kwargs={'symbol': symbol},
dag=taskgroup_dag
)
with TaskGroup(group_id='process_group', dag=taskgroup_dag) as process_group:
# 중첩된 TaskGroup
with TaskGroup(group_id='nested_group', dag=taskgroup_dag) as nested_group:
for symbol in symbols:
PythonOperator(
task_id=f'process_{symbol.lower()}',
python_callable=process_data,
op_kwargs={'symbol': symbol},
dag=taskgroup_dag
)
# 리포트 태스크
report_task = PythonOperator(
task_id='generate_report',
python_callable=generate_report,
dag=taskgroup_dag
)
# 중첩 그룹 완료 후 리포트 실행
nested_group >> report_task
end = DummyOperator(task_id='end', dag=taskgroup_dag)
start >> extract_group >> process_group >> end
0





섹션7에서 실습하신 소스코드가 제공해주신자료에 없습니다~