Posts
Q&A
๊ฐ์์์์ฌ์ฉํ์ root.py ํ์ผ์ด ์๋ณด์ฌ์ ์ค์ตํ๋ฉด์ ๋ง๋ ํ ์คํธ ๊ณต์ ๋๋ ค์
์์ถํ์ผ์ ์๋์ฐ๊ธฐ๋ณธ ์์ถ ํ๋ก๊ทธ๋จ์ผ๋ก ํ์๋๋ ์ค์ต์ฝ๋ ํด๋๊ฐ ์๋์ค๋ ํ์์ด ์์์ต๋๋ค! ์ค์ต์ฝ๋๊ฐ ์์ด์ง์ค ์์๋๋ฐ ์ค์ตํ์ผ ๋ค์ ์ ํ์ธํ์ต๋๋ค~
- Likes
- 0
- Comments
- 7
- Viewcount
- 142
Q&A
๊ฐ์์์์ฌ์ฉํ์ root.py ํ์ผ์ด ์๋ณด์ฌ์ ์ค์ตํ๋ฉด์ ๋ง๋ ํ ์คํธ ๊ณต์ ๋๋ ค์
์ฌ๋ ์ค์ตfrom datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook import json def send_error_to_slack(context): """ ์๋ฌ ๋ฐ์ ์ Slack์ผ๋ก ๋ฉ์์ง ์ ์ก """ # ์๋ฌ ์ ๋ณด ์ถ์ถ task_instance = context.get('task_instance') dag_id = context.get('dag').dag_id task_id = task_instance.task_id execution_date = context.get('execution_date') exception = context.get('exception') # Slack ๋ฉ์์ง ๊ตฌ์ฑ slack_message = { "text": "โ Airflow Task Failed", "attachments": [ { "color": "danger", "fields": [ { "title": "DAG", "value": dag_id, "short": True }, { "title": "Task", "value": task_id, "short": True }, { "title": "Time", "value": str(execution_date), "short": True }, { "title": "Error Message", "value": str(exception) if exception else "Unknown error", "short": False } ] } ] } # Slack์ผ๋ก ์ ์ก try: slack_hook = SlackWebhookHook( slack_webhook_conn_id='slack_default' ) slack_hook.send_dict(slack_message) print("Error notification sent to Slack") except Exception as e: print(f"Failed to send Slack notification: {e}") # ํ ์คํธ์ฉ ํจ์๋ค def task_that_fails(): """์คํจํ๋ ํ์คํฌ""" raise Exception("This task intentionally fails for testing") def task_that_succeeds(): """์ฑ๊ณตํ๋ ํ์คํฌ (์๋ฆผ ์์)""" print("Task completed successfully") return "success" # DAG ์ค์ default_args = { 'owner': 'data-team', 'start_date': datetime(2024, 1, 1), 'retries': 0, # ์ฌ์๋ ์์ 'on_failure_callback': send_error_to_slack, # ์๋ฌ ์์๋ง ์๋ฆผ } dag = DAG( 'slack_error_notification', default_args=default_args, description='์๋ฌ ๋ฐ์ ์ Slack ์๋ฆผ', schedule_interval='@daily', catchup=False ) # ํ์คํฌ ์ ์ fail_task = PythonOperator( task_id='failing_task', python_callable=task_that_fails, dag=dag ) success_task = PythonOperator( task_id='success_task', python_callable=task_that_succeeds, dag=dag ) # ํ์คํฌ ์คํ ์์ success_task >> fail_task
- Likes
- 0
- Comments
- 7
- Viewcount
- 142
Q&A
๊ฐ์์์์ฌ์ฉํ์ root.py ํ์ผ์ด ์๋ณด์ฌ์ ์ค์ตํ๋ฉด์ ๋ง๋ ํ ์คํธ ๊ณต์ ๋๋ ค์
Parallel from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy import DummyOperator import logging import time import random from concurrent.futures import ThreadPoolExecutor, as_completed default_args = { 'owner': 'data-team', 'start_date': datetime(2024, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=2), } def generate_data(total_records=500): symbols = ['AAPL', 'GOOGL', 'MSFT'] data = [] for i in range(total_records): data.append({ 'id': i + 1, 'symbol': random.choice(symbols), 'price': round(random.uniform(100, 300), 2), 'volume': random.randint(1000, 5000) }) logging.info(f"๋ฐ์ดํฐ ์์ฑ: {len(data)}๊ฑด") return data def create_batches(data, batch_size): batches = [] for i in range(0, len(data), batch_size): batches.append(data[i:i + batch_size]) logging.info(f"๋ฐฐ์น ์์ฑ: {len(batches)}๊ฐ") return batches def process_batch(batch_data, batch_id): time.sleep(1) #์ผ๋ จ์ ๋ฐ์ดํฐ ์์ ์ด ๋ณดํต ๋ค์ด๊ฐ total_volume = sum(record['volume'] for record in batch_data) result = { 'batch_id': batch_id, 'count': len(batch_data), 'total_volume': total_volume } logging.info(f"๋ฐฐ์น {batch_id} ์๋ฃ") return result def parallel_processing(): start_time = time.time() data = generate_data(500) batches = create_batches(data, 50) results = [] with ThreadPoolExecutor(max_workers=3) as executor: futures = { executor.submit(process_batch, batch, f"batch_{i+1}"): i for i, batch in enumerate(batches) } for future in as_completed(futures): result = future.result() results.append(result) processing_time = round(time.time() - start_time, 2) logging.info(f"๋ณ๋ ฌ ์ฒ๋ฆฌ ์๋ฃ: {processing_time}์ด, ์ด {len(results)}๊ฐ ๋ฐฐ์น") return { 'time': processing_time, 'batches': len(batches), 'results': results } parallel_dag = DAG( 'parallel_processing_dag', default_args=default_args, description='๋ณ๋ ฌ ์ฒ๋ฆฌ ์์ ', schedule='@daily', catchup=False, tags=['parallel'] ) start = DummyOperator(task_id='start', dag=parallel_dag) parallel_task = PythonOperator( task_id='parallel_task', python_callable=parallel_processing, dag=parallel_dag ) end = DummyOperator(task_id='end', dag=parallel_dag) start >> parallel_task >> end
- Likes
- 0
- Comments
- 7
- Viewcount
- 142
Q&A
๊ฐ์์์์ฌ์ฉํ์ root.py ํ์ผ์ด ์๋ณด์ฌ์ ์ค์ตํ๋ฉด์ ๋ง๋ ํ ์คํธ ๊ณต์ ๋๋ ค์
Custom Operatorfrom datetime import datetime, timedelta from airflow import DAG from airflow.models import BaseOperator from airflow.operators.python import PythonOperator from airflow.operators.dummy import DummyOperator from airflow.utils.decorators import apply_defaults from db_utils import execute_query import logging #๊ธฐ๋ณธ ์ค์ default_args = { 'owner': 'data-team', 'start_date': datetime(2024, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=2), } custom_operator_dag = DAG( 'custom_operator_dag', default_args=default_args, description='Custom Operator ๊ฐ๋จ์์ ', schedule='@daily', catchup=False, tags=['custom-operator', 'simple'] ) start = DummyOperator(task_id='start', dag=custom_operator_dag) # ================= Custom Operator ================= class StockDataOperator(BaseOperator): """์ฃผ์ ๋ฐ์ดํฐ๋ฅผ ์กฐํํ๋ ์ปค์คํ Operator""" @apply_defaults def __init__(self, symbol, days=7, *args, **kwargs): super().__init__(*args, **kwargs) self.symbol = symbol self.days = days def execute(self, context): """์ค์ ์คํ๋๋ ๋ฉ์๋""" logging.info(f"{self.symbol} ์ฃผ์ ๋ฐ์ดํฐ ์กฐํ ์์") query = """ SELECT symbol, COUNT(*) as count, AVG(price) as avg_price FROM stock_prices WHERE symbol = %s AND trade_date >= CURDATE() - INTERVAL %s DAY GROUP BY symbol """ result = execute_query(query, [self.symbol, self.days]) if result: data = { 'symbol': result[0][0], 'count': result[0][1], 'avg_price': float(result[0][2]) if result[0][2] else 0.0 } logging.info(f"{self.symbol} ์กฐํ์๋ฃ {data}") return data else: logging.warning(f"{self.symbol} ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค") return {'symbol': self.symbol, 'count': 0, 'avg_price': 0.0} #Custom Operator ์ฌ์ฉ custom_task = StockDataOperator( task_id='extract_stock_data', symbol='APPL', days=30, dag=custom_operator_dag ) # ================= ๊ธฐ์กด Python Operator ================= # ๊ธฐ์กด PythonOperator์ ๋น๊ต def traditional_function(**context): """๊ธฐ์กด ๋ฐฉ์์ ํจ์""" logging.info("๊ธฐ์กด PythonOperator ๋ฐฉ์") query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = 'AAPL'" result = execute_query(query) count = result[0][0] if result else 0 return {'symbol': 'AAPL', 'count': count} traditional_task = PythonOperator( task_id='traditional_approach', python_callable=traditional_function, dag=custom_operator_dag ) end = DummyOperator(task_id='end', dag=custom_operator_dag) #Task ์์กด์ฑ start >> [custom_task, traditional_task] >> end
- Likes
- 0
- Comments
- 7
- Viewcount
- 142
Q&A
๊ฐ์์์์ฌ์ฉํ์ root.py ํ์ผ์ด ์๋ณด์ฌ์ ์ค์ตํ๋ฉด์ ๋ง๋ ํ ์คํธ ๊ณต์ ๋๋ ค์
TaskGroupfrom datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy import DummyOperator from airflow.utils.task_group import TaskGroup from db_utils import execute_query import logging #๊ธฐ๋ณธ ์ค์ default_args = { 'owner': 'data-team', 'start_date': datetime(2024, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=2), } def extract_data(symbol, **context): """๋ฐ์ดํฐ ์ถ์ถ""" logging.info(f"{symbol} ๋ฐ์ดํฐ ์ถ์ถ") query = "SELECT COUNT(*) FROM stock_prices WHERE symbol = %s" result = execute_query(query, [symbol]) count = result[0][0] if result else 0 logging.info(f"{symbol}: {count}๊ฑด ์ถ์ถ") return {'symbol': symbol, 'count': count} def process_data(symbol, **context): """๋ฐ์ดํฐ ์ฒ๋ฆฌ""" logging.info(f"{symbol} ๋ฐ์ดํฐ ์ฒ๋ฆฌ") ti = context['ti'] # Task Instance (TI) extracted = ti.xcom_pull(task_ids=f'extract_group.extract_{symbol.lower()}') processed = { 'symbol': symbol, 'count': extracted['count'], 'status': 'processed' if extracted['count'] > 0 else 'empty' } logging.info(f"{symbol} ์ฒ๋ฆฌ ์๋ฃ: {processed}") return processed def generate_report(**context): """์ ์ฒด ๋ฆฌํฌํธ ์์ฑ""" logging.info("๋ฆฌํฌํธ ์์ฑ ์์") ti = context['ti'] symbols = ['AAPL', 'GOOGL'] total_count = 0 for symbol in symbols: processed = ti.xcom_pull(task_ids=f'process_group.nested_group.process_{symbol.lower()}') if processed: total_count += processed['count'] report = f"์ด {total_count}๊ฑด ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์๋ฃ" logging.info(report) return report # TaskGroup DAG taskgroup_dag = DAG( 'taskgroup_simple_dag', default_args=default_args, description='TaskGroup ๊ฐ๋จ ์์ ', schedule='@daily', catchup=False, tags=['taskgroup', 'simple'] ) start = DummyOperator(task_id='start', dag=taskgroup_dag) # ๊ธฐ๋ณธ Task Group with TaskGroup(group_id='extract_group', dag=taskgroup_dag) as extract_group: symbols = ['AAPL', 'GOOGL'] for symbol in symbols: PythonOperator( task_id=f'extract_{symbol.lower()}', python_callable=extract_data, op_kwargs={'symbol': symbol}, dag=taskgroup_dag ) with TaskGroup(group_id='process_group', dag=taskgroup_dag) as process_group: # ์ค์ฒฉ๋ TaskGroup with TaskGroup(group_id='nested_group', dag=taskgroup_dag) as nested_group: for symbol in symbols: PythonOperator( task_id=f'process_{symbol.lower()}', python_callable=process_data, op_kwargs={'symbol': symbol}, dag=taskgroup_dag ) # ๋ฆฌํฌํธ ํ์คํฌ report_task = PythonOperator( task_id='generate_report', python_callable=generate_report, dag=taskgroup_dag ) # ์ค์ฒฉ ๊ทธ๋ฃน ์๋ฃ ํ ๋ฆฌํฌํธ ์คํ nested_group >> report_task end = DummyOperator(task_id='end', dag=taskgroup_dag) start >> extract_group >> process_group >> end
- Likes
- 0
- Comments
- 7
- Viewcount
- 142




