묻고 답해요
164만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Airflow 마스터 클래스
plugins 폴더 생성
안녕하세요!plugins 폴더 생성은 단순히 airflow/plugins/shell 각 폴더 생성해서, sh 파일을 넣어서 수업 부분 진행하면 될까요?위 방법으로 했는데 선생님과 다르게, wsl에서 airflow 경로 안에 plugins가 안보이네요ㅠ
-
미해결Airflow 마스터 클래스
WSL에서 git push 가 안되요 ㅠ
실습내용을 열심히 따라가다 보니 WSL에서 git push하려면 토큰 방식으로 생성된 비번을 입력하라고 하셨는데 하다 보니 아래 캡처와 같이 계속 에러가 나네요. 따라하다 무엇을 잘못한 걸까요? ㅠㅠ 참고로 토큰값 생성은 Tokens(classic)으로도 해보고, Fine-grained tokens로도 생성해서 해 봤습니다. 둘 다 안되네요. ㅠ
-
미해결Airflow 마스터 클래스
chatGPT&Airflow로 블로그 자동 포스팅하기 는 Deprecated 가 필요합니다.
2024년 2월 이후로 Tistory OpenAPI 서비스 종료가 되었다고 합니다. https://tistory.github.io/document-tistory-apis/
-
미해결Airflow 마스터 클래스
github에 회원가입이 안되는데 원인이 뭔지 모르겠어요 ㅠ
캡차 응답을 확인할 수 없습니다. 문제 해결 정보는 https://docs.github.com/articles/troubleshooting-connectivity-problems/#troubleshooting-the-captcha 를 참조하세요.위와 같이 메시지가 뜨는데 회사나 기관의 네트워크을 사용하라고 하는데 꼭 고정IP를 사용해야 하는 건가 해서요. 집이나 도서관 등 유동IP는 사용이 불가능한 건가요?
-
미해결실리콘밸리 데이터 리더가 알려주는 Airflow 기초
자료 다운로드 하면 링크가 모두 클릭이 안됨
자료 다운로드 하면 링크가 모두 클릭이 안됩니다.모든 강의 자료 PDF로 다운로드되며, 하이퍼링크처럼 표시만 되고 클릭 불가합니다. 링크는 다른곳에 따로 올리시는 걸까요?
-
해결됨실리콘밸리 엔지니어와 함께하는 Apache Airflow
apache airflow 설치하기 질문
안녕하세요mac에서 apache 설치하려고 하는데, Apache Airflow 설치하기 강의가learn-apache-airflow-main.zip파일 기반으로 진행되는 걸까요?일단 강의환경에 맞춰서 python 3.10 & airflow 2.6.2 로 가상환경 생성해서 하고있는데 강의에서보이는 파일목록이 따로 생성되지 않아서 문의드립니다..
-
해결됨토스 개발자와 함께하는 Data Workflow Management 기반의 대용량 데이터 처리 설계 패턴
강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요
dynamicDagfrom datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from db_utils import execute_query import logging def log_task_start(message): logging.info(f"[START] {message}") def log_task_end(message): logging.info(f"[END] {message}") # 분석할 주식 종목 리스트 (설정으로 관리) STOCK_SYMBOLS = ['AAPL','GOOGL','MSFT','AMZN','TSLA'] default_args = { 'owner': 'data-team', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } def analyze_stock_data(symbol, **context): """주식 종목에 대해서, 최근 30일간의 주식 데이터를 분석""" log_task_start(f"주식 분석 - {symbol}") query = """ SELECT symbol, AVG(price) as avg_price, MAX(price) as max_price, MIN(price) as min_price, SUM(volume) as total_volume, COUNT(*) as trading_days FROM stock_prices WHERE symbol = %s AND trade_date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) GROUP BY symbol """ result = execute_query(query, [symbol]) if result: data = result[0] print(f"{symbol} 분석 결과:") print(f" 평균 가격: ${data[1]:.2f}") print(f" 최고 가격: ${data[2]:.2f}") print(f" 최저 가격: ${data[3]:.2f}") print(f" 총 거래량: {data[4]:,}") print(f" 거래일수: {data[5]}일") log_task_end(f"주식 분석 - {symbol}") return result def create_stock_dag(symbol): """개별 주식에 대한 DAG 생성 함수""" dag_id = f'stock_analysis_{symbol.lower()}' dag = DAG( dag_id, default_args=default_args, description=f'{symbol} 주식 데이터 분석 DAG', schedule='@daily', catchup=False, tags=['dynamic', 'stock', 'analysis', symbol] ) # 데이터 품질 체크 태스크 def check_data_quality(**context): query = """ SELECT COUNT(*) as count FROM stock_prices WHERE symbol = %s AND trade_date = CURDATE() - INTERVAL 1 DAY """ result = execute_query(query, [symbol]) logging.info(f"{symbol} 데이터 품질 체크: {result[0][0] if result else 0}") return result data_quality_check = PythonOperator( task_id=f'check_data_quality_{symbol.lower()}', python_callable=check_data_quality, dag=dag ) # 주식 분석 태스크 analyze_task = PythonOperator( task_id=f'analyze_{symbol.lower()}', python_callable=analyze_stock_data, op_kwargs={'symbol': symbol}, dag=dag ) # 결과 저장 태스크 def save_results(**context): query = """ INSERT INTO daily_stats (symbol, summary_date, avg_price, total_volume, trade_count) SELECT %s, CURDATE() - INTERVAL 1 DAY, AVG(price), SUM(volume), COUNT(*) FROM stock_prices WHERE symbol = %s AND trade_date = CURDATE() - INTERVAL 1 DAY ON DUPLICATE KEY UPDATE avg_price = VALUES(avg_price), total_volume = VALUES(total_volume), trade_count = VALUES(trade_count) """ result = execute_query(query, [symbol, symbol]) logging.info(f"{symbol} 분석 결과 저장 완료") return result save_results = PythonOperator( task_id=f'save_analysis_{symbol.lower()}', python_callable=save_results, dag=dag ) # 태스크 의존성 설정 data_quality_check >> analyze_task >> save_results return dag # Dynamic DAG 생성 # 각 주식 종목에 대해 별도의 DAG 생성 for symbol in STOCK_SYMBOLS: dag_id = f'stock_analysis_{symbol.lower()}' globals()[dag_id] = create_stock_dag(symbol)CrossDag (트리거 센서)from datetime import datetime, timedelta from airflow import DAG, Dataset from airflow.operators.python import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sensors.external_task import ExternalTaskSensor from db_utils import execute_query import logging #기본 설정 default_args = { 'owner': 'data-team', 'start_date': datetime(2024, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=2), } # ======================================================== # TriggerDagRunOperator # ======================================================== def process_data(**context): """간단한 데이터 처리""" logging.info("데이터 처리 시작") query = "SELECT COUNT(*) FROM stock_prices" result = execute_query(query) count = result[0][0] if result else 0 logging.info(f"처리된 레코드 수 : ${count}") return {"record_count": count} trigger_dag = DAG( 'trigger_example_dag', default_args=default_args, description='TriggerDagRunOperator 예제', schedule='@daily', catchup=False, tags=['cross-dag', 'trigger'] ) process_task = PythonOperator( task_id='process_data', python_callable=process_data, dag=trigger_dag ) #다른 DAG를 트리거 (가장 기본적인 사용법) trigger_next_dag = TriggerDagRunOperator( task_id='trigger_sensor_dag', trigger_dag_id='sensor_example_dag', wait_for_completion=False, # 완료를 기다리지 않음 dag=trigger_dag ) process_task >> trigger_next_dag # ======================================================== # ExternalTaskSensor # ======================================================== # 두 번째 DAG: 외부 태스크 대기 def analyze_triggered_data(**context): """트리거된 후 분석 작업""" logging.info("외부 태스크 완료 후 분석 시작") # 간단한분석로직 analysis_result = {"status": "completed", "timestamp": str(datetime.now())} logging.info(f"분석 완료: {analysis_result}") return analysis_result sensor_dag = DAG( 'sensor_example_dag', default_args=default_args, description='ExternalTaskSensor 예제', schedule=None, # 트리거로만 실행됨 catchup=False, tags=['cross-dag', 'sensor'] ) # 외부 DAG의 태스크 완료를 기다림 def get_most_recent_dag_run(dt): """가장 최근의 DAG 실행을 찾는 함수""" from airflow.models import DagRun from airflow.utils.session import provide_session @provide_session def get_recent_run(session=None): recent_run = session.query(DagRun).filter( DagRun.dag_id == 'trigger_example_dag', DagRun.state == 'success' ).order_by(DagRun.execution_date.desc()).first() if recent_run: return recent_run.execution_date return dt return get_recent_run() wait_for_external_task = ExternalTaskSensor( task_id='wait_for_process_data', external_dag_id='trigger_example_dag', # 기다릴 DAG external_task_id='process_data', # 기다릴 태스크 execution_date_fn=get_most_recent_dag_run, #최근성공한 실행 DAG timeout=60, # 1분 타임아웃 poke_interval=5, # 5초마다 확인 allowed_states=['success'], #성공 상태만 기다림 dag=sensor_dag ) analyze_task = PythonOperator( task_id='analyze_triggered_data', python_callable=analyze_triggered_data, dag=sensor_dag ) wait_for_external_task >> analyze_task # ======================================================== # Dataset Denpendencies # ======================================================== # Dataset 정의 (Dataset Dependencies 용) market_data_dataset = Dataset("market_data") analysis_dataset = Dataset("analysis_result") def create_market_data(**context): """마켓 데이터를 생성하고 Dataset을 업데이트""" logging.info("마켓 데이터 생성 시작") # 실제 데이터 생성 로직 query = """ SELECT symbol, AVG(price) as avg_price FROM stock_prices WHERE trade_date = CURDATE() - INTERVAL 1 DAY GROUP BY symbol LIMIT 3 """ result = execute_query(query) market_data = [] if result: for row in result: market_data.append({"symbol": row[0], "avg_price": float(row[1])}) logging.info(f"생성된 마켓 데이터: {market_data}") return market_data dataset_producer_dag = DAG( 'dataset_producer_dag', default_args=default_args, description='Dataset Dependencies - 생산자', schedule='@daily', catchup=False, tags=['cross-dag', 'dataset', 'producer'] ) create_data_task = PythonOperator( task_id='create_market_data', python_callable=create_market_data, outlets=[market_data_dataset], # Dataset 업데이트 알림 dag=dataset_producer_dag ) # Consumer def consume_market_data(**context): """생성된 마켓 데이터를 소비하여 분석""" logging.info("마켓 데이터 소비 및 분석 시작") # Dataset이 업데이트되면 자동으로 실행됨 query = "SELECT COUNT(DISTINCT symbol) FROM stock_prices" result = execute_query(query) symbol_count = result[0][0] if result else 0 analysis = { "total_symbols": symbol_count, "analysis_time": str(datetime.now()), "status": "Dataset 기반 분석 완료" } logging.info(f"Dataset 기반 분석 결과: {analysis}") return analysis def generate_final_report(**context): """최종 리포트 생성""" logging.info("최종 리포트 생성") ti = context['ti'] analysis = ti.xcom_pull(task_ids='consume_market_data') report = f"Dataset 기반 리포트: {analysis.get('total_symbols', 0)}개 심볼 분석 완료" logging.info(report) return report dataset_consumer_dag = DAG( 'dataset_consumer_dag', default_args=default_args, description='Dataset Dependencies - 소비자', schedule=[market_data_dataset], # Dataset이 업데이트되면 실행 catchup=False, tags=['cross-dag', 'dataset', 'consumer'] ) consume_task = PythonOperator( task_id='consume_market_data', python_callable=consume_market_data, outlets=[analysis_dataset], # 다음 Dataset 업데이트 dag=dataset_consumer_dag ) report_task = PythonOperator( task_id='generate_final_report', python_callable=generate_final_report, dag=dataset_consumer_dag ) consume_task >> report_task
-
미해결실리콘밸리 데이터 리더가 알려주는 Airflow 기초
48강 강의 여전히 49강과 같은 강의가 나옵니다
48. Postgres 테이블 읽어오기 강의가 여전히 49 49. Postgres 테이블 읽어오기와 같은 강의가 나옵니다. 이론 강의가 아닌 실습강의가 나오고 있습니다.
-
미해결Airflow 마스터 클래스
설치중인데 venv를 꼭 써야할까요?
venv로 해서 하니까 이전에 있었던 라이브러리들이 안먹혀서요? 문제는 없나요?
-
미해결Airflow 마스터 클래스
설치 버전 관련 질문입니다.
도커를 사용한 것은 3.1.3 버전으로 다운 받고, 파이썬 3.9 버전에서 다운 받는 것은 3.0.4 버전으로 다운받았는데 혹시 문제 발생할 것이 있을까요?
-
미해결Airflow 마스터 클래스
우분투 버전 다운받아야하는데 어떤걸로 설치해야할까요?
위와 같이 설치된 배포판이 없어서 list 검색하니 여러 배포판 리스트가 나오는데요~지금시점에서 어떤 버전을 설치해야할지 말씀해주시면 좋을 거 같습니다~~!
-
미해결실리콘밸리 데이터 리더가 알려주는 Airflow 기초
forloop으로 task 정의시 task_id 정해지는 로직
안녕하세요. 49강 수강 중 질문이 있습니다. for table_name in TABLES.keys(): extract_from_postgres(postgres_schema, table_name) >> load_to_snowflake(snowflake_schema, table_name)현재 강사님이 주신 이 코드 기준 테이블 2개 tasks 2개 해서 총 4개의 tasks가 airflow tasks list의 결과로 반환되었는데요, 이때의 결과물인 tasks_id가 어떻게 만들어지는지 궁금합니다.조금 더 정확히는 forloop으로 task를 정의할 때 어떤 식으로 DAG가 이 task의 개수를 세고 네이밍을 하는지 궁금합니다.혼자 테스트를 해보고 싶어서 임의의 테이블 하나를 postgres:production에 추가하고 코드내부의 TABLES 딕셔너리에 제가 추가한 테이블의 스키마를 추가하였습니다. 이때 테이블이 총 3개가 되었으므로 airflow tasks list의 결과가 총 6개가 될 것으로 예상하였는데 여전히 4개로 나옵니다. 제가 놓친 부분이 있을까요?현재 production schema아래 3개의 테이블이 있는 상태입니다.airflow=# SELECT table_name FROM information_schema.tables WHERE table_schema = 'production' AND table_type = 'BASE TABLE' ORDER BY table_name; table_name ------------------------ session_timestamp user_session_channel user_session_channel_2 (3 rows)감사합니다. 학습 관련 질문을 상세하게 남겨주시면 더 좋습니다. 예를 들어 이해가 안 가는 부분이 있다고 하면 강의에서 어느 부분인지 어떤 부분이 이해가 안되는지 등등 추가 정보가 큰 도움이 됩니다. 그리고 에러가 난다면 어떤 에러 메시지가 나오는지 같이 공유해주세요. 혹시라도 유사한 질문이 있었는지 먼저 확인 부탁 드리겠습니다. 서로 예의를 지키며 존중하는 문화를 만들어갔으면 하고 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.
-
미해결Airflow 마스터 클래스
DAG 만들기 중 airflow 패키지 로드 에러
안녕하세요 강사님, 강의 잘 수강하고 있습니다. Airflow 패키지 설치 이후 DAG 만들기 실습을 진행하려고 하는데 VSCode 환경에서 코드 입력 시 패키지를 로드하는 다음 코드에서 오류가 출력됩니다:Import "airflow.providers.standard.operators.bash" could not be resolvedPylancereportMissingImports cmd 터미널에서 (venv) 확인하고 airflow 설치 진행했는데, airflow, pendulum 등 관련 패키지를 인식하지 못하고 있는 것 같습니다. 오류 해결 위해 추가로 어떤 부분 확인해볼 수 있을지 도움 주시면 감사드리겠습니다. 그리고 VSCode 시작해서 처음 오픈되는 powershell 터미널에서 붉은 글씨로 보안 오류 (UnauthorizedAccess)가 출력되는데 이 오류는 대처할 필요는 없을까요? 확인 부탁드립니다. 감사합니다!
-
해결됨실리콘밸리 엔지니어와 함께하는 Apache Airflow
postgres_loader DAG 에러
해당 강의에서 진행한 postgres_loader DAG를 돌리면 자꾸 sample_table이 존재하지 않는다는 에러가 발생해서 질문 남깁니다.도커에서 설치해서 사용하고 있고 postgres 설치가 잘못된 것 같은데 다른 질문들 보면서 이것저것수정을 해서 더 잘 모르겠습니다 ㅠㅠ +) postgres 설치부터 잘못된 것 같은데.. 해결방법 좀 알려주세요!
-
미해결Airflow 마스터 클래스
3.0에서도 수업노트가 성립하는지 확인 부탁드립니다!
뒷 강의(Bash Operator에서 Jinja template 사용하기)의 수업 노트에 다음의 내용이 나오는데요.이 강의의 수업노트가 잘 이해되지 않았었는데 관련이 있을까요?data_interval_start 값과 data_interval_end 값이 동일하게 나오도록 변경되었습니다.
-
미해결Airflow 마스터 클래스
task_id 사용법이 뭔가 바뀐 것 같습니다.
@task만 쓰거나inner 함수에 return을 하거나multiple_outputs=True 옵션(이건 안해봄)으로 해결되는 것 같습니다. 로그는 다음과 같습니다.Timestamp: 2025-11-01 19:54:50 Traceback (most recent call last): File "<attrs generated methods airflow.sdk.bases.decorator._TaskDecorator>", line 37, in init _setattr('multiple_outputs', __attr_factory_multiple_outputs(self)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/decorator.py", line 328, in _infer_multiple_outputs if "return" not in self.function.annotations: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AttributeError: 'str' object has no attribute 'annotations'. Did you mean: 'contains'?
-
해결됨실리콘밸리 엔지니어와 함께하는 Apache Airflow
Queue 강의를 듣고 난 후에 대한 질의
Queue에 대한 질문이 몇가지가 있습니다. 큐를 지정할 때는 무조건 큐의 이름을 지정해 줘야 하나요? 자동으로 비어 있는 woker에 큐를 할당하는 방법은 없나요?cpu_intensive라는 woker에 여러개의 큐가 동시에 요청이 왔을 경우 동기적으로 처리하나요?큐를 생성하면 해당 큐의 물리적 자원은 어떻게 할당 되는 것인가요?대체로 하나의 DAG에서 강의에 예시와 같이 여러 개의 큐를 사용하는 경우가 있을까요?워커를 많이 만들어 환경을 구성하는 사례는 어떤 사례가 있는지 알 수 있을까요? 내용 확인 부탁드립니다.
-
미해결Airflow 마스터 클래스
email operator 오류 관련
다음과 같이 진행했는데 dag을 돌리면 밑에 캡쳐처럼 오류가 나네용 왜그럴까용??
-
미해결Airflow 마스터 클래스
plugins 폴더 관련
이 부분 진행할 때 airflow 폴더에 plugins가 있던데 수업에서 만들었던적이 있을까요..? 듣고있는데 제 폴더에는 plugins가 없어서용! 따로 만들어야 하는건지 제가 놓친건지 궁금합니다
-
미해결Airflow 마스터 클래스
bash operator 관련 문의입니다
airflow에서 example_complex에 dag 코드가 이렇게 적혀있던데수업꺼랑 좀 코드 차이가 있어서요!스케쥴이나 import하는거나 변경을 어떻게해야할지 몰라서 문의드립니다