인프런 커뮤니티 질문&답변
강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요
작성
·
8
·
수정됨
0
dynamicDag
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from db_utils import execute_query
import logging
def log_task_start(message):
logging.info(f"[START] {message}")
def log_task_end(message):
logging.info(f"[END] {message}")
# 분석할 주식 종목 리스트 (설정으로 관리)
STOCK_SYMBOLS = ['AAPL','GOOGL','MSFT','AMZN','TSLA']
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def analyze_stock_data(symbol, **context):
"""주식 종목에 대해서, 최근 30일간의 주식 데이터를 분석"""
log_task_start(f"주식 분석 - {symbol}")
query = """
SELECT
symbol,
AVG(price) as avg_price,
MAX(price) as max_price,
MIN(price) as min_price,
SUM(volume) as total_volume,
COUNT(*) as trading_days
FROM stock_prices
WHERE symbol = %s
AND trade_date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY)
GROUP BY symbol
"""
result = execute_query(query, [symbol])
if result:
data = result[0]
print(f"{symbol} 분석 결과:")
print(f" 평균 가격: ${data[1]:.2f}")
print(f" 최고 가격: ${data[2]:.2f}")
print(f" 최저 가격: ${data[3]:.2f}")
print(f" 총 거래량: {data[4]:,}")
print(f" 거래일수: {data[5]}일")
log_task_end(f"주식 분석 - {symbol}")
return result
def create_stock_dag(symbol):
"""개별 주식에 대한 DAG 생성 함수"""
dag_id = f'stock_analysis_{symbol.lower()}'
dag = DAG(
dag_id,
default_args=default_args,
description=f'{symbol} 주식 데이터 분석 DAG',
schedule='@daily',
catchup=False,
tags=['dynamic', 'stock', 'analysis', symbol]
)
# 데이터 품질 체크 태스크
def check_data_quality(**context):
query = """
SELECT COUNT(*) as count
FROM stock_prices
WHERE symbol = %s
AND trade_date = CURDATE() - INTERVAL 1 DAY
"""
result = execute_query(query, [symbol])
logging.info(f"{symbol} 데이터 품질 체크: {result[0][0] if result else 0}")
return result
data_quality_check = PythonOperator(
task_id=f'check_data_quality_{symbol.lower()}',
python_callable=check_data_quality,
dag=dag
)
# 주식 분석 태스크
analyze_task = PythonOperator(
task_id=f'analyze_{symbol.lower()}',
python_callable=analyze_stock_data,
op_kwargs={'symbol': symbol},
dag=dag
)
# 결과 저장 태스크
def save_results(**context):
query = """
INSERT INTO daily_stats (symbol, summary_date, avg_price, total_volume, trade_count)
SELECT
%s,
CURDATE() - INTERVAL 1 DAY,
AVG(price),
SUM(volume),
COUNT(*)
FROM stock_prices
WHERE symbol = %s
AND trade_date = CURDATE() - INTERVAL 1 DAY
ON DUPLICATE KEY UPDATE
avg_price = VALUES(avg_price),
total_volume = VALUES(total_volume),
trade_count = VALUES(trade_count)
"""
result = execute_query(query, [symbol, symbol])
logging.info(f"{symbol} 분석 결과 저장 완료")
return result
save_results = PythonOperator(
task_id=f'save_analysis_{symbol.lower()}',
python_callable=save_results,
dag=dag
)
# 태스크 의존성 설정
data_quality_check >> analyze_task >> save_results
return dag
# Dynamic DAG 생성
# 각 주식 종목에 대해 별도의 DAG 생성
for symbol in STOCK_SYMBOLS:
dag_id = f'stock_analysis_{symbol.lower()}'
globals()[dag_id] = create_stock_dag(symbol)CrossDag (트리거 센서)
from datetime import datetime, timedelta
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from db_utils import execute_query
import logging
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
def process_data(**context):
"""간단한 데이터 처리"""
logging.info("데이터 처리 시작")
query = "SELECT COUNT(*) FROM stock_prices"
result = execute_query(query)
count = result[0][0] if result else 0
logging.info(f"처리된 레코드 수 : ${count}")
return {"record_count": count}
trigger_dag = DAG(
'trigger_example_dag',
default_args=default_args,
description='TriggerDagRunOperator 예제',
schedule='@daily',
catchup=False,
tags=['cross-dag', 'trigger']
)
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=trigger_dag
)
trigger_next_dag = TriggerDagRunOperator(
task_id='trigger_sensor_dag',
trigger_dag_id='sensor_example_dag',
wait_for_completion=False, # 완료를 기다리지 않음
dag=trigger_dag
)
process_task >> trigger_next_dag
# ========================================================
# 두 번째 DAG: 외부 태스크 대기
def analyze_triggered_data(**context):
"""트리거된 후 분석 작업"""
logging.info("외부 태스크 완료 후 분석 시작")
analysis_result = {
"status": "completed",
"timestamp": str(datetime.now())
}
logging.info(f"분석 완료: {analysis_result}")
return analysis_result
sensor_dag = DAG(
'sensor_example_dag',
default_args=default_args,
description='ExternalTaskSensor 예제',
schedule=None, # 트리거로만 실행됨
catchup=False,
tags=['cross-dag', 'sensor']
)
# 외부 DAG의 태스크 완료를 기다림
def get_most_recent_dag_run(dt):
"""가장 최근의 DAG 실행을 찾는 함수"""
from airflow.models import DagRun
from airflow.utils.session import provide_session
@provide_session
def get_recent_run(session=None):
recent_run = session.query(DagRun).filter(
DagRun.dag_id == 'trigger_example_dag',
DagRun.state == 'success'
).order_by(DagRun.execution_date.desc()).first()
if recent_run:
return recent_run.execution_date
return dt
return get_recent_run()
wait_for_external_task = ExternalTaskSensor(
task_id='wait_for_process_data',
external_dag_id='trigger_example_dag', # 기다릴 DAG
external_task_id='process_data', # 기다릴 태스크
execution_date_fn=get_most_recent_dag_run,
timeout=60, # 1분 타임아웃
poke_interval=5, # 5초마다 확인
allowed_states=['success'],
dag=sensor_dag
)
analyze_task = PythonOperator(
task_id='analyze_triggered_data',
python_callable=analyze_triggered_data,
dag=sensor_dag
)
wait_for_external_task >> analyze_task
답변
답변을 기다리고 있는 질문이에요
첫번째 답변을 남겨보세요!




