강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요
0
dynamicDag
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from db_utils import execute_query
import logging
def log_task_start(message):
logging.info(f"[START] {message}")
def log_task_end(message):
logging.info(f"[END] {message}")
# 분석할 주식 종목 리스트 (설정으로 관리)
STOCK_SYMBOLS = ['AAPL','GOOGL','MSFT','AMZN','TSLA']
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def analyze_stock_data(symbol, **context):
"""주식 종목에 대해서, 최근 30일간의 주식 데이터를 분석"""
log_task_start(f"주식 분석 - {symbol}")
query = """
SELECT
symbol,
AVG(price) as avg_price,
MAX(price) as max_price,
MIN(price) as min_price,
SUM(volume) as total_volume,
COUNT(*) as trading_days
FROM stock_prices
WHERE symbol = %s
AND trade_date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY)
GROUP BY symbol
"""
result = execute_query(query, [symbol])
if result:
data = result[0]
print(f"{symbol} 분석 결과:")
print(f" 평균 가격: ${data[1]:.2f}")
print(f" 최고 가격: ${data[2]:.2f}")
print(f" 최저 가격: ${data[3]:.2f}")
print(f" 총 거래량: {data[4]:,}")
print(f" 거래일수: {data[5]}일")
log_task_end(f"주식 분석 - {symbol}")
return result
def create_stock_dag(symbol):
"""개별 주식에 대한 DAG 생성 함수"""
dag_id = f'stock_analysis_{symbol.lower()}'
dag = DAG(
dag_id,
default_args=default_args,
description=f'{symbol} 주식 데이터 분석 DAG',
schedule='@daily',
catchup=False,
tags=['dynamic', 'stock', 'analysis', symbol]
)
# 데이터 품질 체크 태스크
def check_data_quality(**context):
query = """
SELECT COUNT(*) as count
FROM stock_prices
WHERE symbol = %s
AND trade_date = CURDATE() - INTERVAL 1 DAY
"""
result = execute_query(query, [symbol])
logging.info(f"{symbol} 데이터 품질 체크: {result[0][0] if result else 0}")
return result
data_quality_check = PythonOperator(
task_id=f'check_data_quality_{symbol.lower()}',
python_callable=check_data_quality,
dag=dag
)
# 주식 분석 태스크
analyze_task = PythonOperator(
task_id=f'analyze_{symbol.lower()}',
python_callable=analyze_stock_data,
op_kwargs={'symbol': symbol},
dag=dag
)
# 결과 저장 태스크
def save_results(**context):
query = """
INSERT INTO daily_stats (symbol, summary_date, avg_price, total_volume, trade_count)
SELECT
%s,
CURDATE() - INTERVAL 1 DAY,
AVG(price),
SUM(volume),
COUNT(*)
FROM stock_prices
WHERE symbol = %s
AND trade_date = CURDATE() - INTERVAL 1 DAY
ON DUPLICATE KEY UPDATE
avg_price = VALUES(avg_price),
total_volume = VALUES(total_volume),
trade_count = VALUES(trade_count)
"""
result = execute_query(query, [symbol, symbol])
logging.info(f"{symbol} 분석 결과 저장 완료")
return result
save_results = PythonOperator(
task_id=f'save_analysis_{symbol.lower()}',
python_callable=save_results,
dag=dag
)
# 태스크 의존성 설정
data_quality_check >> analyze_task >> save_results
return dag
# Dynamic DAG 생성
# 각 주식 종목에 대해 별도의 DAG 생성
for symbol in STOCK_SYMBOLS:
dag_id = f'stock_analysis_{symbol.lower()}'
globals()[dag_id] = create_stock_dag(symbol)CrossDag (트리거 센서)
from datetime import datetime, timedelta
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from db_utils import execute_query
import logging
#기본 설정
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
# ========================================================
# TriggerDagRunOperator
# ========================================================
def process_data(**context):
"""간단한 데이터 처리"""
logging.info("데이터 처리 시작")
query = "SELECT COUNT(*) FROM stock_prices"
result = execute_query(query)
count = result[0][0] if result else 0
logging.info(f"처리된 레코드 수 : ${count}")
return {"record_count": count}
trigger_dag = DAG(
'trigger_example_dag',
default_args=default_args,
description='TriggerDagRunOperator 예제',
schedule='@daily',
catchup=False,
tags=['cross-dag', 'trigger']
)
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=trigger_dag
)
#다른 DAG를 트리거 (가장 기본적인 사용법)
trigger_next_dag = TriggerDagRunOperator(
task_id='trigger_sensor_dag',
trigger_dag_id='sensor_example_dag',
wait_for_completion=False, # 완료를 기다리지 않음
dag=trigger_dag
)
process_task >> trigger_next_dag
# ========================================================
# ExternalTaskSensor
# ========================================================
# 두 번째 DAG: 외부 태스크 대기
def analyze_triggered_data(**context):
"""트리거된 후 분석 작업"""
logging.info("외부 태스크 완료 후 분석 시작")
# 간단한분석로직
analysis_result = {"status": "completed", "timestamp": str(datetime.now())}
logging.info(f"분석 완료: {analysis_result}")
return analysis_result
sensor_dag = DAG(
'sensor_example_dag',
default_args=default_args,
description='ExternalTaskSensor 예제',
schedule=None, # 트리거로만 실행됨
catchup=False,
tags=['cross-dag', 'sensor']
)
# 외부 DAG의 태스크 완료를 기다림
def get_most_recent_dag_run(dt):
"""가장 최근의 DAG 실행을 찾는 함수"""
from airflow.models import DagRun
from airflow.utils.session import provide_session
@provide_session
def get_recent_run(session=None):
recent_run = session.query(DagRun).filter(
DagRun.dag_id == 'trigger_example_dag',
DagRun.state == 'success'
).order_by(DagRun.execution_date.desc()).first()
if recent_run:
return recent_run.execution_date
return dt
return get_recent_run()
wait_for_external_task = ExternalTaskSensor(
task_id='wait_for_process_data',
external_dag_id='trigger_example_dag', # 기다릴 DAG
external_task_id='process_data', # 기다릴 태스크
execution_date_fn=get_most_recent_dag_run, #최근성공한 실행 DAG
timeout=60, # 1분 타임아웃
poke_interval=5, # 5초마다 확인
allowed_states=['success'], #성공 상태만 기다림
dag=sensor_dag
)
analyze_task = PythonOperator(
task_id='analyze_triggered_data',
python_callable=analyze_triggered_data,
dag=sensor_dag
)
wait_for_external_task >> analyze_task
# ========================================================
# Dataset Denpendencies
# ========================================================
# Dataset 정의 (Dataset Dependencies 용)
market_data_dataset = Dataset("market_data")
analysis_dataset = Dataset("analysis_result")
def create_market_data(**context):
"""마켓 데이터를 생성하고 Dataset을 업데이트"""
logging.info("마켓 데이터 생성 시작")
# 실제 데이터 생성 로직
query = """
SELECT symbol, AVG(price) as avg_price
FROM stock_prices
WHERE trade_date = CURDATE() - INTERVAL 1 DAY
GROUP BY symbol
LIMIT 3
"""
result = execute_query(query)
market_data = []
if result:
for row in result:
market_data.append({"symbol": row[0], "avg_price": float(row[1])})
logging.info(f"생성된 마켓 데이터: {market_data}")
return market_data
dataset_producer_dag = DAG(
'dataset_producer_dag',
default_args=default_args,
description='Dataset Dependencies - 생산자',
schedule='@daily',
catchup=False,
tags=['cross-dag', 'dataset', 'producer']
)
create_data_task = PythonOperator(
task_id='create_market_data',
python_callable=create_market_data,
outlets=[market_data_dataset], # Dataset 업데이트 알림
dag=dataset_producer_dag
)
# Consumer
def consume_market_data(**context):
"""생성된 마켓 데이터를 소비하여 분석"""
logging.info("마켓 데이터 소비 및 분석 시작")
# Dataset이 업데이트되면 자동으로 실행됨
query = "SELECT COUNT(DISTINCT symbol) FROM stock_prices"
result = execute_query(query)
symbol_count = result[0][0] if result else 0
analysis = {
"total_symbols": symbol_count,
"analysis_time": str(datetime.now()),
"status": "Dataset 기반 분석 완료"
}
logging.info(f"Dataset 기반 분석 결과: {analysis}")
return analysis
def generate_final_report(**context):
"""최종 리포트 생성"""
logging.info("최종 리포트 생성")
ti = context['ti']
analysis = ti.xcom_pull(task_ids='consume_market_data')
report = f"Dataset 기반 리포트: {analysis.get('total_symbols', 0)}개 심볼 분석 완료"
logging.info(report)
return report
dataset_consumer_dag = DAG(
'dataset_consumer_dag',
default_args=default_args,
description='Dataset Dependencies - 소비자',
schedule=[market_data_dataset], # Dataset이 업데이트되면 실행
catchup=False,
tags=['cross-dag', 'dataset', 'consumer']
)
consume_task = PythonOperator(
task_id='consume_market_data',
python_callable=consume_market_data,
outlets=[analysis_dataset], # 다음 Dataset 업데이트
dag=dataset_consumer_dag
)
report_task = PythonOperator(
task_id='generate_final_report',
python_callable=generate_final_report,
dag=dataset_consumer_dag
)
consume_task >> report_task
빅데이터
docker
docker-compose
airflow
Answer 7
0
슬랙 실습
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
import json
def send_error_to_slack(context):
"""
에러 발생 시 Slack으로 메시지 전송
"""
# 에러 정보 추출
task_instance = context.get('task_instance')
dag_id = context.get('dag').dag_id
task_id = task_instance.task_id
execution_date = context.get('execution_date')
exception = context.get('exception')
# Slack 메시지 구성
slack_message = {
"text": "❌ Airflow Task Failed",
"attachments": [
{
"color": "danger",
"fields": [
{
"title": "DAG",
"value": dag_id,
"short": True
},
{
"title": "Task",
"value": task_id,
"short": True
},
{
"title": "Time",
"value": str(execution_date),
"short": True
},
{
"title": "Error Message",
"value": str(exception) if exception else "Unknown error",
"short": False
}
]
}
]
}
# Slack으로 전송
try:
slack_hook = SlackWebhookHook(
slack_webhook_conn_id='slack_default'
)
slack_hook.send_dict(slack_message)
print("Error notification sent to Slack")
except Exception as e:
print(f"Failed to send Slack notification: {e}")
# 테스트용 함수들
def task_that_fails():
"""실패하는 태스크"""
raise Exception("This task intentionally fails for testing")
def task_that_succeeds():
"""성공하는 태스크 (알림 없음)"""
print("Task completed successfully")
return "success"
# DAG 설정
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 0, # 재시도 없음
'on_failure_callback': send_error_to_slack, # 에러 시에만 알림
}
dag = DAG(
'slack_error_notification',
default_args=default_args,
description='에러 발생 시 Slack 알림',
schedule_interval='@daily',
catchup=False
)
# 태스크 정의
fail_task = PythonOperator(
task_id='failing_task',
python_callable=task_that_fails,
dag=dag
)
success_task = PythonOperator(
task_id='success_task',
python_callable=task_that_succeeds,
dag=dag
)
# 태스크 실행 순서
success_task >> fail_task
0
혹시 dellahong님 어떤 root.py가 보이지 않는다고 하시는지 공유 가능하실까요??
0
Parallel
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
import logging
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
def generate_data(total_records=500):
symbols = ['AAPL', 'GOOGL', 'MSFT']
data = []
for i in range(total_records):
data.append({
'id': i + 1,
'symbol': random.choice(symbols),
'price': round(random.uniform(100, 300), 2),
'volume': random.randint(1000, 5000)
})
logging.info(f"데이터 생성: {len(data)}건")
return data
def create_batches(data, batch_size):
batches = []
for i in range(0, len(data), batch_size):
batches.append(data[i:i + batch_size])
logging.info(f"배치 생성: {len(batches)}개")
return batches
def process_batch(batch_data, batch_id):
time.sleep(1) #일련의 데이터 작업이 보통 들어감
total_volume = sum(record['volume'] for record in batch_data)
result = {
'batch_id': batch_id,
'count': len(batch_data),
'total_volume': total_volume
}
logging.info(f"배치 {batch_id} 완료")
return result
def parallel_processing():
start_time = time.time()
data = generate_data(500)
batches = create_batches(data, 50)
results = []
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(process_batch, batch, f"batch_{i+1}"): i
for i, batch in enumerate(batches)
}
for future in as_completed(futures):
result = future.result()
results.append(result)
processing_time = round(time.time() - start_time, 2)
logging.info(f"병렬 처리 완료: {processing_time}초, 총 {len(results)}개 배치")
return {
'time': processing_time,
'batches': len(batches),
'results': results
}
parallel_dag = DAG(
'parallel_processing_dag',
default_args=default_args,
description='병렬 처리 예제',
schedule='@daily',
catchup=False,
tags=['parallel']
)
start = DummyOperator(task_id='start', dag=parallel_dag)
parallel_task = PythonOperator(
task_id='parallel_task',
python_callable=parallel_processing,
dag=parallel_dag
)
end = DummyOperator(task_id='end', dag=parallel_dag)
start >> parallel_task >> end
0
Custom Operator
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.decorators import apply_defaults
from db_utils import execute_query
import logging
#기본 설정
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
custom_operator_dag = DAG(
'custom_operator_dag',
default_args=default_args,
description='Custom Operator 간단예제',
schedule='@daily',
catchup=False,
tags=['custom-operator', 'simple']
)
start = DummyOperator(task_id='start', dag=custom_operator_dag)
# ================= Custom Operator =================
class StockDataOperator(BaseOperator):
"""주식 데이터를 조회하는 커스텀 Operator"""
@apply_defaults
def __init__(self, symbol, days=7, *args, **kwargs):
super().__init__(*args, **kwargs)
self.symbol = symbol
self.days = days
def execute(self, context):
"""실제 실행되는 메소드"""
logging.info(f"{self.symbol} 주식 데이터 조회 시작")
query = """
SELECT symbol, COUNT(*) as count, AVG(price) as avg_price
FROM stock_prices
WHERE symbol = %s
AND trade_date >= CURDATE() - INTERVAL %s DAY
GROUP BY symbol
"""
result = execute_query(query, [self.symbol, self.days])
if result:
data = {
'symbol': result[0][0],
'count': result[0][1],
'avg_price': float(result[0][2]) if result[0][2] else 0.0
}
logging.info(f"{self.symbol} 조회완료 {data}")
return data
else:
logging.warning(f"{self.symbol} 데이터가 없습니다")
return {'symbol': self.symbol, 'count': 0, 'avg_price': 0.0}
#Custom Operator 사용
custom_task = StockDataOperator(
task_id='extract_stock_data',
symbol='APPL',
days=30,
dag=custom_operator_dag
)
# ================= 기존 Python Operator =================
# 기존 PythonOperator와 비교
def traditional_function(**context):
"""기존 방식의 함수"""
logging.info("기존 PythonOperator 방식")
query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = 'AAPL'"
result = execute_query(query)
count = result[0][0] if result else 0
return {'symbol': 'AAPL', 'count': count}
traditional_task = PythonOperator(
task_id='traditional_approach',
python_callable=traditional_function,
dag=custom_operator_dag
)
end = DummyOperator(task_id='end', dag=custom_operator_dag)
#Task 의존성
start >> [custom_task, traditional_task] >> end
0
TaskGroup
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
from db_utils import execute_query
import logging
#기본 설정
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
def extract_data(symbol, **context):
"""데이터 추출"""
logging.info(f"{symbol} 데이터 추출")
query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = %s"
result = execute_query(query, [symbol])
count = result[0][0] if result else 0
logging.info(f"{symbol}: {count}건 추출")
return {'symbol': symbol, 'count': count}
def process_data(symbol, **context):
"""데이터 처리"""
logging.info(f"{symbol} 데이터 처리")
ti = context['ti'] # Task Instance (TI)
extracted = ti.xcom_pull(task_ids=f'extract_group.extract_{symbol.lower()}')
processed = {
'symbol': symbol,
'count': extracted['count'],
'status': 'processed' if extracted['count'] > 0 else 'empty'
}
logging.info(f"{symbol} 처리 완료: {processed}")
return processed
def generate_report(**context):
"""전체 리포트 생성"""
logging.info("리포트 생성 시작")
ti = context['ti']
symbols = ['AAPL', 'GOOGL']
total_count = 0
for symbol in symbols:
processed = ti.xcom_pull(task_ids=f'process_group.nested_group.process_{symbol.lower()}')
if processed:
total_count += processed['count']
report = f"총 {total_count}건 데이터 처리 완료"
logging.info(report)
return report
# TaskGroup DAG
taskgroup_dag = DAG(
'taskgroup_simple_dag',
default_args=default_args,
description='TaskGroup 간단 예제',
schedule='@daily',
catchup=False,
tags=['taskgroup', 'simple']
)
start = DummyOperator(task_id='start', dag=taskgroup_dag)
# 기본 Task Group
with TaskGroup(group_id='extract_group', dag=taskgroup_dag) as extract_group:
symbols = ['AAPL', 'GOOGL']
for symbol in symbols:
PythonOperator(
task_id=f'extract_{symbol.lower()}',
python_callable=extract_data,
op_kwargs={'symbol': symbol},
dag=taskgroup_dag
)
with TaskGroup(group_id='process_group', dag=taskgroup_dag) as process_group:
# 중첩된 TaskGroup
with TaskGroup(group_id='nested_group', dag=taskgroup_dag) as nested_group:
for symbol in symbols:
PythonOperator(
task_id=f'process_{symbol.lower()}',
python_callable=process_data,
op_kwargs={'symbol': symbol},
dag=taskgroup_dag
)
# 리포트 태스크
report_task = PythonOperator(
task_id='generate_report',
python_callable=generate_report,
dag=taskgroup_dag
)
# 중첩 그룹 완료 후 리포트 실행
nested_group >> report_task
end = DummyOperator(task_id='end', dag=taskgroup_dag)
start >> extract_group >> process_group >> end
docker compose에 대해 질문드립니다.
0
8
1
작업형 1 유형 부분
0
9
1
작업형 1 (삭제예정, 구 버전)
0
28
2
수강기간 연장 문의드립니다.
0
20
1
2유형 레이블 인코딩 VS 원핫 인코딩
0
20
3
수강기간 연장 문의드립니다.
0
26
1
인덱스 슬라이싱
0
26
2
astro dev start - python 라이브러리 설치 fail
0
22
1
맥북 환경구성 에러
0
19
2
Free Edition 실습 영상은 아직 업데이트전인가요?
0
22
1
질문 드립니다.
0
45
2
강의 내용 관련 질문드립니다~
0
43
2
수강 연장 문의
0
54
2
강의자료 일괄 다운로드
0
49
2
list 문제 질문드립니다~
0
34
2
빅분기 실기 12회 재도전
0
52
2
강의 기간 연장 가능여부 검토 요청건
0
38
2
수강기간 연장 문의 드립니다
0
43
2
JPA Repository 질문이 있습니다!
1
33
2
페이지네이션 처리를 쿼리에서 하는 방식 질문
1
36
1
수강기간 연장 문의드립니다
0
50
2
디바이스 페어링 문의
0
32
2
질문이요
0
55
2
실습환경에 대해 질문이 있습니다!
0
65
2

