dellahong
@dellahong
수강평 작성수
1
평균평점
5.0
게시글
질문&답변
강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요
압축파일을 윈도우기본 압축 프로그램으로 풀었더니 실습코드 폴더가 안나오는 현상이 있었습니다! 실습코드가 없어진줄 알있는데 실습파일 다시 잘 확인했습니다~
- 0
- 7
- 52
질문&답변
강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요
슬랙 실습from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook import json def send_error_to_slack(context): """ 에러 발생 시 Slack으로 메시지 전송 """ # 에러 정보 추출 task_instance = context.get('task_instance') dag_id = context.get('dag').dag_id task_id = task_instance.task_id execution_date = context.get('execution_date') exception = context.get('exception') # Slack 메시지 구성 slack_message = { "text": "❌ Airflow Task Failed", "attachments": [ { "color": "danger", "fields": [ { "title": "DAG", "value": dag_id, "short": True }, { "title": "Task", "value": task_id, "short": True }, { "title": "Time", "value": str(execution_date), "short": True }, { "title": "Error Message", "value": str(exception) if exception else "Unknown error", "short": False } ] } ] } # Slack으로 전송 try: slack_hook = SlackWebhookHook( slack_webhook_conn_id='slack_default' ) slack_hook.send_dict(slack_message) print("Error notification sent to Slack") except Exception as e: print(f"Failed to send Slack notification: {e}") # 테스트용 함수들 def task_that_fails(): """실패하는 태스크""" raise Exception("This task intentionally fails for testing") def task_that_succeeds(): """성공하는 태스크 (알림 없음)""" print("Task completed successfully") return "success" # DAG 설정 default_args = { 'owner': 'data-team', 'start_date': datetime(2024, 1, 1), 'retries': 0, # 재시도 없음 'on_failure_callback': send_error_to_slack, # 에러 시에만 알림 } dag = DAG( 'slack_error_notification', default_args=default_args, description='에러 발생 시 Slack 알림', schedule_interval='@daily', catchup=False ) # 태스크 정의 fail_task = PythonOperator( task_id='failing_task', python_callable=task_that_fails, dag=dag ) success_task = PythonOperator( task_id='success_task', python_callable=task_that_succeeds, dag=dag ) # 태스크 실행 순서 success_task >> fail_task
- 0
- 7
- 52
질문&답변
강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요
Parallel from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy import DummyOperator import logging import time import random from concurrent.futures import ThreadPoolExecutor, as_completed default_args = { 'owner': 'data-team', 'start_date': datetime(2024, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=2), } def generate_data(total_records=500): symbols = ['AAPL', 'GOOGL', 'MSFT'] data = [] for i in range(total_records): data.append({ 'id': i + 1, 'symbol': random.choice(symbols), 'price': round(random.uniform(100, 300), 2), 'volume': random.randint(1000, 5000) }) logging.info(f"데이터 생성: {len(data)}건") return data def create_batches(data, batch_size): batches = [] for i in range(0, len(data), batch_size): batches.append(data[i:i + batch_size]) logging.info(f"배치 생성: {len(batches)}개") return batches def process_batch(batch_data, batch_id): time.sleep(1) #일련의 데이터 작업이 보통 들어감 total_volume = sum(record['volume'] for record in batch_data) result = { 'batch_id': batch_id, 'count': len(batch_data), 'total_volume': total_volume } logging.info(f"배치 {batch_id} 완료") return result def parallel_processing(): start_time = time.time() data = generate_data(500) batches = create_batches(data, 50) results = [] with ThreadPoolExecutor(max_workers=3) as executor: futures = { executor.submit(process_batch, batch, f"batch_{i+1}"): i for i, batch in enumerate(batches) } for future in as_completed(futures): result = future.result() results.append(result) processing_time = round(time.time() - start_time, 2) logging.info(f"병렬 처리 완료: {processing_time}초, 총 {len(results)}개 배치") return { 'time': processing_time, 'batches': len(batches), 'results': results } parallel_dag = DAG( 'parallel_processing_dag', default_args=default_args, description='병렬 처리 예제', schedule='@daily', catchup=False, tags=['parallel'] ) start = DummyOperator(task_id='start', dag=parallel_dag) parallel_task = PythonOperator( task_id='parallel_task', python_callable=parallel_processing, dag=parallel_dag ) end = DummyOperator(task_id='end', dag=parallel_dag) start >> parallel_task >> end
- 0
- 7
- 52
질문&답변
강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요
Custom Operatorfrom datetime import datetime, timedelta from airflow import DAG from airflow.models import BaseOperator from airflow.operators.python import PythonOperator from airflow.operators.dummy import DummyOperator from airflow.utils.decorators import apply_defaults from db_utils import execute_query import logging #기본 설정 default_args = { 'owner': 'data-team', 'start_date': datetime(2024, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=2), } custom_operator_dag = DAG( 'custom_operator_dag', default_args=default_args, description='Custom Operator 간단예제', schedule='@daily', catchup=False, tags=['custom-operator', 'simple'] ) start = DummyOperator(task_id='start', dag=custom_operator_dag) # ================= Custom Operator ================= class StockDataOperator(BaseOperator): """주식 데이터를 조회하는 커스텀 Operator""" @apply_defaults def __init__(self, symbol, days=7, *args, **kwargs): super().__init__(*args, **kwargs) self.symbol = symbol self.days = days def execute(self, context): """실제 실행되는 메소드""" logging.info(f"{self.symbol} 주식 데이터 조회 시작") query = """ SELECT symbol, COUNT(*) as count, AVG(price) as avg_price FROM stock_prices WHERE symbol = %s AND trade_date >= CURDATE() - INTERVAL %s DAY GROUP BY symbol """ result = execute_query(query, [self.symbol, self.days]) if result: data = { 'symbol': result[0][0], 'count': result[0][1], 'avg_price': float(result[0][2]) if result[0][2] else 0.0 } logging.info(f"{self.symbol} 조회완료 {data}") return data else: logging.warning(f"{self.symbol} 데이터가 없습니다") return {'symbol': self.symbol, 'count': 0, 'avg_price': 0.0} #Custom Operator 사용 custom_task = StockDataOperator( task_id='extract_stock_data', symbol='APPL', days=30, dag=custom_operator_dag ) # ================= 기존 Python Operator ================= # 기존 PythonOperator와 비교 def traditional_function(**context): """기존 방식의 함수""" logging.info("기존 PythonOperator 방식") query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = 'AAPL'" result = execute_query(query) count = result[0][0] if result else 0 return {'symbol': 'AAPL', 'count': count} traditional_task = PythonOperator( task_id='traditional_approach', python_callable=traditional_function, dag=custom_operator_dag ) end = DummyOperator(task_id='end', dag=custom_operator_dag) #Task 의존성 start >> [custom_task, traditional_task] >> end
- 0
- 7
- 52
질문&답변
강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요
TaskGroupfrom datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy import DummyOperator from airflow.utils.task_group import TaskGroup from db_utils import execute_query import logging #기본 설정 default_args = { 'owner': 'data-team', 'start_date': datetime(2024, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=2), } def extract_data(symbol, **context): """데이터 추출""" logging.info(f"{symbol} 데이터 추출") query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = %s" result = execute_query(query, [symbol]) count = result[0][0] if result else 0 logging.info(f"{symbol}: {count}건 추출") return {'symbol': symbol, 'count': count} def process_data(symbol, **context): """데이터 처리""" logging.info(f"{symbol} 데이터 처리") ti = context['ti'] # Task Instance (TI) extracted = ti.xcom_pull(task_ids=f'extract_group.extract_{symbol.lower()}') processed = { 'symbol': symbol, 'count': extracted['count'], 'status': 'processed' if extracted['count'] > 0 else 'empty' } logging.info(f"{symbol} 처리 완료: {processed}") return processed def generate_report(**context): """전체 리포트 생성""" logging.info("리포트 생성 시작") ti = context['ti'] symbols = ['AAPL', 'GOOGL'] total_count = 0 for symbol in symbols: processed = ti.xcom_pull(task_ids=f'process_group.nested_group.process_{symbol.lower()}') if processed: total_count += processed['count'] report = f"총 {total_count}건 데이터 처리 완료" logging.info(report) return report # TaskGroup DAG taskgroup_dag = DAG( 'taskgroup_simple_dag', default_args=default_args, description='TaskGroup 간단 예제', schedule='@daily', catchup=False, tags=['taskgroup', 'simple'] ) start = DummyOperator(task_id='start', dag=taskgroup_dag) # 기본 Task Group with TaskGroup(group_id='extract_group', dag=taskgroup_dag) as extract_group: symbols = ['AAPL', 'GOOGL'] for symbol in symbols: PythonOperator( task_id=f'extract_{symbol.lower()}', python_callable=extract_data, op_kwargs={'symbol': symbol}, dag=taskgroup_dag ) with TaskGroup(group_id='process_group', dag=taskgroup_dag) as process_group: # 중첩된 TaskGroup with TaskGroup(group_id='nested_group', dag=taskgroup_dag) as nested_group: for symbol in symbols: PythonOperator( task_id=f'process_{symbol.lower()}', python_callable=process_data, op_kwargs={'symbol': symbol}, dag=taskgroup_dag ) # 리포트 태스크 report_task = PythonOperator( task_id='generate_report', python_callable=generate_report, dag=taskgroup_dag ) # 중첩 그룹 완료 후 리포트 실행 nested_group >> report_task end = DummyOperator(task_id='end', dag=taskgroup_dag) start >> extract_group >> process_group >> end
- 0
- 7
- 52




