묻고 답해요
161만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
해결됨실리콘밸리 엔지니어와 함께하는 Apache Airflow
apache airflow 설치하기 질문
안녕하세요mac에서 apache 설치하려고 하는데, Apache Airflow 설치하기 강의가learn-apache-airflow-main.zip파일 기반으로 진행되는 걸까요?일단 강의환경에 맞춰서 python 3.10 & airflow 2.6.2 로 가상환경 생성해서 하고있는데 강의에서보이는 파일목록이 따로 생성되지 않아서 문의드립니다..
-
미해결토스 개발자와 함께하는 Data Workflow Management 기반의 대용량 데이터 처리 설계 패턴
강의에서사용하신 root.py 파일이 안보여서 실습하면서 만든 텍스트 공유 드려요
dynamicDagfrom datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from db_utils import execute_query import logging def log_task_start(message): logging.info(f"[START] {message}") def log_task_end(message): logging.info(f"[END] {message}") # 분석할 주식 종목 리스트 (설정으로 관리) STOCK_SYMBOLS = ['AAPL','GOOGL','MSFT','AMZN','TSLA'] default_args = { 'owner': 'data-team', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } def analyze_stock_data(symbol, **context): """주식 종목에 대해서, 최근 30일간의 주식 데이터를 분석""" log_task_start(f"주식 분석 - {symbol}") query = """ SELECT symbol, AVG(price) as avg_price, MAX(price) as max_price, MIN(price) as min_price, SUM(volume) as total_volume, COUNT(*) as trading_days FROM stock_prices WHERE symbol = %s AND trade_date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) GROUP BY symbol """ result = execute_query(query, [symbol]) if result: data = result[0] print(f"{symbol} 분석 결과:") print(f" 평균 가격: ${data[1]:.2f}") print(f" 최고 가격: ${data[2]:.2f}") print(f" 최저 가격: ${data[3]:.2f}") print(f" 총 거래량: {data[4]:,}") print(f" 거래일수: {data[5]}일") log_task_end(f"주식 분석 - {symbol}") return result def create_stock_dag(symbol): """개별 주식에 대한 DAG 생성 함수""" dag_id = f'stock_analysis_{symbol.lower()}' dag = DAG( dag_id, default_args=default_args, description=f'{symbol} 주식 데이터 분석 DAG', schedule='@daily', catchup=False, tags=['dynamic', 'stock', 'analysis', symbol] ) # 데이터 품질 체크 태스크 def check_data_quality(**context): query = """ SELECT COUNT(*) as count FROM stock_prices WHERE symbol = %s AND trade_date = CURDATE() - INTERVAL 1 DAY """ result = execute_query(query, [symbol]) logging.info(f"{symbol} 데이터 품질 체크: {result[0][0] if result else 0}") return result data_quality_check = PythonOperator( task_id=f'check_data_quality_{symbol.lower()}', python_callable=check_data_quality, dag=dag ) # 주식 분석 태스크 analyze_task = PythonOperator( task_id=f'analyze_{symbol.lower()}', python_callable=analyze_stock_data, op_kwargs={'symbol': symbol}, dag=dag ) # 결과 저장 태스크 def save_results(**context): query = """ INSERT INTO daily_stats (symbol, summary_date, avg_price, total_volume, trade_count) SELECT %s, CURDATE() - INTERVAL 1 DAY, AVG(price), SUM(volume), COUNT(*) FROM stock_prices WHERE symbol = %s AND trade_date = CURDATE() - INTERVAL 1 DAY ON DUPLICATE KEY UPDATE avg_price = VALUES(avg_price), total_volume = VALUES(total_volume), trade_count = VALUES(trade_count) """ result = execute_query(query, [symbol, symbol]) logging.info(f"{symbol} 분석 결과 저장 완료") return result save_results = PythonOperator( task_id=f'save_analysis_{symbol.lower()}', python_callable=save_results, dag=dag ) # 태스크 의존성 설정 data_quality_check >> analyze_task >> save_results return dag # Dynamic DAG 생성 # 각 주식 종목에 대해 별도의 DAG 생성 for symbol in STOCK_SYMBOLS: dag_id = f'stock_analysis_{symbol.lower()}' globals()[dag_id] = create_stock_dag(symbol)CrossDag (트리거 센서)from datetime import datetime, timedelta from airflow import DAG, Dataset from airflow.operators.python import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.sensors.external_task import ExternalTaskSensor from db_utils import execute_query import logging #기본 설정 default_args = { 'owner': 'data-team', 'start_date': datetime(2024, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=2), } # ======================================================== # TriggerDagRunOperator # ======================================================== def process_data(**context): """간단한 데이터 처리""" logging.info("데이터 처리 시작") query = "SELECT COUNT(*) FROM stock_prices" result = execute_query(query) count = result[0][0] if result else 0 logging.info(f"처리된 레코드 수 : ${count}") return {"record_count": count} trigger_dag = DAG( 'trigger_example_dag', default_args=default_args, description='TriggerDagRunOperator 예제', schedule='@daily', catchup=False, tags=['cross-dag', 'trigger'] ) process_task = PythonOperator( task_id='process_data', python_callable=process_data, dag=trigger_dag ) #다른 DAG를 트리거 (가장 기본적인 사용법) trigger_next_dag = TriggerDagRunOperator( task_id='trigger_sensor_dag', trigger_dag_id='sensor_example_dag', wait_for_completion=False, # 완료를 기다리지 않음 dag=trigger_dag ) process_task >> trigger_next_dag # ======================================================== # ExternalTaskSensor # ======================================================== # 두 번째 DAG: 외부 태스크 대기 def analyze_triggered_data(**context): """트리거된 후 분석 작업""" logging.info("외부 태스크 완료 후 분석 시작") # 간단한분석로직 analysis_result = {"status": "completed", "timestamp": str(datetime.now())} logging.info(f"분석 완료: {analysis_result}") return analysis_result sensor_dag = DAG( 'sensor_example_dag', default_args=default_args, description='ExternalTaskSensor 예제', schedule=None, # 트리거로만 실행됨 catchup=False, tags=['cross-dag', 'sensor'] ) # 외부 DAG의 태스크 완료를 기다림 def get_most_recent_dag_run(dt): """가장 최근의 DAG 실행을 찾는 함수""" from airflow.models import DagRun from airflow.utils.session import provide_session @provide_session def get_recent_run(session=None): recent_run = session.query(DagRun).filter( DagRun.dag_id == 'trigger_example_dag', DagRun.state == 'success' ).order_by(DagRun.execution_date.desc()).first() if recent_run: return recent_run.execution_date return dt return get_recent_run() wait_for_external_task = ExternalTaskSensor( task_id='wait_for_process_data', external_dag_id='trigger_example_dag', # 기다릴 DAG external_task_id='process_data', # 기다릴 태스크 execution_date_fn=get_most_recent_dag_run, #최근성공한 실행 DAG timeout=60, # 1분 타임아웃 poke_interval=5, # 5초마다 확인 allowed_states=['success'], #성공 상태만 기다림 dag=sensor_dag ) analyze_task = PythonOperator( task_id='analyze_triggered_data', python_callable=analyze_triggered_data, dag=sensor_dag ) wait_for_external_task >> analyze_task # ======================================================== # Dataset Denpendencies # ======================================================== # Dataset 정의 (Dataset Dependencies 용) market_data_dataset = Dataset("market_data") analysis_dataset = Dataset("analysis_result") def create_market_data(**context): """마켓 데이터를 생성하고 Dataset을 업데이트""" logging.info("마켓 데이터 생성 시작") # 실제 데이터 생성 로직 query = """ SELECT symbol, AVG(price) as avg_price FROM stock_prices WHERE trade_date = CURDATE() - INTERVAL 1 DAY GROUP BY symbol LIMIT 3 """ result = execute_query(query) market_data = [] if result: for row in result: market_data.append({"symbol": row[0], "avg_price": float(row[1])}) logging.info(f"생성된 마켓 데이터: {market_data}") return market_data dataset_producer_dag = DAG( 'dataset_producer_dag', default_args=default_args, description='Dataset Dependencies - 생산자', schedule='@daily', catchup=False, tags=['cross-dag', 'dataset', 'producer'] ) create_data_task = PythonOperator( task_id='create_market_data', python_callable=create_market_data, outlets=[market_data_dataset], # Dataset 업데이트 알림 dag=dataset_producer_dag ) # Consumer def consume_market_data(**context): """생성된 마켓 데이터를 소비하여 분석""" logging.info("마켓 데이터 소비 및 분석 시작") # Dataset이 업데이트되면 자동으로 실행됨 query = "SELECT COUNT(DISTINCT symbol) FROM stock_prices" result = execute_query(query) symbol_count = result[0][0] if result else 0 analysis = { "total_symbols": symbol_count, "analysis_time": str(datetime.now()), "status": "Dataset 기반 분석 완료" } logging.info(f"Dataset 기반 분석 결과: {analysis}") return analysis def generate_final_report(**context): """최종 리포트 생성""" logging.info("최종 리포트 생성") ti = context['ti'] analysis = ti.xcom_pull(task_ids='consume_market_data') report = f"Dataset 기반 리포트: {analysis.get('total_symbols', 0)}개 심볼 분석 완료" logging.info(report) return report dataset_consumer_dag = DAG( 'dataset_consumer_dag', default_args=default_args, description='Dataset Dependencies - 소비자', schedule=[market_data_dataset], # Dataset이 업데이트되면 실행 catchup=False, tags=['cross-dag', 'dataset', 'consumer'] ) consume_task = PythonOperator( task_id='consume_market_data', python_callable=consume_market_data, outlets=[analysis_dataset], # 다음 Dataset 업데이트 dag=dataset_consumer_dag ) report_task = PythonOperator( task_id='generate_final_report', python_callable=generate_final_report, dag=dataset_consumer_dag ) consume_task >> report_task
-
미해결베개 투자법: 자면서 돈 버는 AI 주식 자동 매매 머신
미래에셋의 API 적용 가능성
강의 "베개 투자법: 자면서 돈 버는 AI 주식 자동 매매 머신" 에서 한국투자증권의 API 가 사용되었는데, 이것을 다른 증권회사 예를 들면 미래에셋의 API 를 사용하여 실전에 적용하는 것도 가능한가요? 이때 주의점은 무엇인가요
-
해결됨[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
합격했습니다! 감사합니다.
방금 사전점수 확인했는데 합격이네요 ㅎㅎ강사님 덕에 1트에 바로 합격한 것 같습니다. 강의가 정말 도움 많이 되었고, 질문을 많이 남겼는데 답변도 빠르게 해주셔서 보다 빠르고 효율적으로 공부할 수 있었던 것 같습니다. 감사합니다!여담이지만 2유형은 그냥 무지성 randomforest에다가 train_test_split 검증도 안 하고 score 확인도 안 한 채로 코드 실행되는 것만 확인하고 제출했더니 5점 감점됐고, 3유형에선 민감도 구하는 문제에서 민감도가 뭐였는지가 헷갈렸는데 아마 이거 틀린 거 같네요 ㅋㅋㅋㅠㅠ
-
미해결[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
사전점수 공개 재검토 요청
오늘 사전점수가 공개됐는데 랜덤포레스트 사용한 사람들은 다 감점이라네요,,ㅠㅠ전처리할 게 아예 없었는데 재검토 요청하면 수용되는 경우도 있나요?
-
미해결빅데이터분석기사 실기 R 올인원: 3주에 끝내는 완벽 대비
실전문제 풀이(1) - 문제 3
같은 코드를 입력한 것 같은데, lb, ub 값이 많이 다릅니다.어디서 오류가 생긴건지 알려주실 수 있을까요..?
-
미해결[2025] SQLD 문제가 어려운 당신을 위한 노랭이 176 문제 풀이
DIVIDE 인지 어떻게 판단할 수 있나요?
"비선호 컨텐츠 엔티티에 등록된 데이터에 대해서는 추천을 수행하지 않아야한다."이 부분은 LEFT JOIN 을 하고 IS NULL 로도 체크가 가능하지 않을까 생각이 드는데, 설명에서 저 구분은 DIVIDE 이기 때문에 NOT EXISTS 로 해야된다고 하셨는데, 판단 이유가 궁급합니다!
-
미해결15일간의 빅데이터 파일럿 프로젝트
클라우데라 매니저 접속 불가 및 로그인 정보 문의
안녕하세요, 강사님.빅데이터 파일럿 프로젝트 강의를 수강 중입니다.현재 강의 내용대로 Server01과 Server02를 모두 실행했고,http://server.hadoop.com:7180 주소로 클라우데라 매니저에 접속을 시도했는데웹 페이지 자체가 열리지 않습니다.또한 강의 자료에서 로그인 아이디와 패스워드 정보도 찾을 수 없었습니다.혹시클라우데라 매니저 기본 접속 주소기본 로그인 아이디와 패스워드접속이 되지 않을 때 점검해야 할 설정을 알려주실 수 있을까요?처음 환경을 구성하는 단계라서 안내해 주시면 큰 도움이 될 것 같습니다.감사합니다.
-
해결됨[2025년 최신 기출 반영] 빅데이터 분석 기사 실기 시험 100% 합격 ! 기출 문제의 패턴이 보인다 !
코랩 기본 사용법 문의
코랩을 처음 사용하면서 궁금한점이 있어서 여쭈어봅니다. 코랩 왼쪽 폴더 그림을 클릭하면 파일 탐색기 같은 화면에서 가장 첫번째 위치에 있는 '..폴더'를 더블클릭하면 bin, boot, ... 여러폴더가 나오는데, 여기에서 다시 이전 경로로 돌아가려면 어떻게 해야 하나요?
-
해결됨[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
수강기간 연장 문의드립니다.
안녕하세요 빅분기 실기 강의 수강생입니다. 다름이 아니라, 제가 이번 실기 시험을 응시했지만 떨어질 것 같아서 미처 다 듣지 못한 나머지 강의들을 마저 수강하고자 하는데혹시 2주 정도 강의 연장이 가능할 지 문의드립니다. 제 메일 주소는 ytb.nayun@gmail.com 입니다. (연장제도가 있는지 몰라서 기간 만료일에 촉박하게 문의드리는 점 양해부탁드립니다. ㅠㅠ)
-
미해결[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
작업형 2유형 질문 드려요..
학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요!질문과 관련된 영상 위치를 알려주면 더 빠르게 답변할 수 있어요먼저 유사한 질문이 있었는지 검색해보세요이번 시험 작업형2에서 범주형 변수가 없어서 원핫인코딩을 진행하지 않았는데 자료형을 보면 int랑 float가 있는데 모든 변수를 int로 바꿔줬어야 했나요...? 다들 평가지표를 보면 0.6..., 0.7... 이런식으로 나오셨는데 저는 0.06..., 0.07.. 이렇게 나왔거든요.. 혹시 작업형2 점수는 0점처리가 될까요???
-
해결됨[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
강사님 시험 끝내기 버튼 ㅠㅠ
아래 있는 질문과 비슷하긴 한데… 시험 시간 내에 “시험 끝내기“ 버튼을 누르지 못한 채 창이 닫혀버렸습니다ㅠㅠ 아는 문제들은 제출하기를 다 눌렀는데 채점에는 문제가 없을까요? 아예 몰라서 미제출한 문제는 그대로 미제출인채로 두어도 그 문제만 0점 받고 끝인거겠죠?혹시 어떻게 될지 몰라서 다다음주 점수 발표까지 계속 걱정만 할것 같아요…
-
미해결[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
train_test_split, ttest_1samp
안녕하세요!세가지 질문 있습니다 [작업형 2 관련]오늘 유형2에서 train_test_split을 사용 못하고point = int(len(train)*0.8)x_tr = train[:point]x_val = train[point:]와 같은 형태로 앞에서부터 자른 데이터로 학습/검증 진행했는데.. 이런 경우에, train_test_split 을 사용했을때보다점수가 많이 낮을까요? ㅜㅜpred == y_val 단순 비교 시 87% 일치했습니다(object type column과 결측치는 없어서 인코딩은 생략했습니다 ) [작업형 3-1 관련]1번문제가 특정값=N일 때와 비교하여특정값=P일때의 종속변수 오즈비 구하는 문제였는데특정값=N인 경우는 coefficient가 안나오더라구요..np.exp(P인 경우의 coefficient)로 제출했는데 제가모델 만들때 뭔가 빠트린걸까요? [작업형 3-2 관련]동일집단의 전후 비교를 물어봐서 대응표본검정 문제 같았는데, 집단의 과거 시점 데이터 중에 주어진건특정 컬럼의 “평균값“이라서, 단일표본검정 문제를 풀듯이ttest_1samp(df[“A“], (A의 과거 평균값) )으로 넣고 풀었습니다. 정확한 출제 내용 없이 질문드려서 좀 그런데..제가 유형 분석을 맞게 한걸까요?
-
해결됨[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
수치형 데이터 원핫인코딩 수행
학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요!질문과 관련된 영상 위치를 알려주면 더 빠르게 답변할 수 있어요먼저 유사한 질문이 있었는지 검색해보세요오늘 시험 보고 왔습니다! 벼락치기 하다보니..ㅠㅠ 랜덤포레스트로 풀 수 있는 템플릿 자체를 외워 제출하다보니 범주형 데이터가 아닌데도 .. 원핫인코딩을 수행했습니다ㅜㅜ 마지막 제출하기 전에 컬럼 수도 비교해봤는데 test와 동일했고 성능도 다른 분들이랑 별 차이가 안나는데.. 문제가 될 수 있을까요?2유형에서 깎이지만 않는다면 65점으로 아슬하지만 합격할 것 같습니다ㅠㅠ 선생님 덕분입니다!!!
-
해결됨[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
오늘 있었던 시험 질문입니다..
학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요!질문과 관련된 영상 위치를 알려주면 더 빠르게 답변할 수 있어요먼저 유사한 질문이 있었는지 검색해보세요안녕하세요 선생님 일단 그동안 좋은강의 해주셔서 감사합니다 작업형 2에서 오브젝트 컬럼이 없어가지고 인코딩 과정을 생략했는데 그래도 괜찮을까요?평가해보진 않고 바로 제출했는데 오류는 없었씁니다 ㅠㅠ제가 정확히 기억이 안나는데.. .혹시 작업형 3에서 1번인가 2번문제에서 선형회기나 로지스틱회기에서 일부 독립변수만 선택하라는 문항이 있었을까요? 귀신에 씌인것처럼 그런 문항이 있었는지 갑자기 헷갈리네요 ㅠㅠ
-
해결됨[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
2유형 제출 시
학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요!질문과 관련된 영상 위치를 알려주면 더 빠르게 답변할 수 있어요먼저 유사한 질문이 있었는지 검색해보세요2유형 제출할 때pd.DataFrame({'pred‘:pred}).to_csv('result.csv' , index=False)print(pd.read_csv('result.csv')) 이렇게만 하고 제출해도 되나여??
-
해결됨[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
시험 제출 관련 질문
바로 질문을 먼저 말씀드리면각 문제별 제출은 완료한 상태에서시험시간이 지나고 시험끝내기 버튼을 누르면 0점 처리 되나요?(각 문제 제출은 했으나, 최종 제출 미제출 확인 못함) 혹시 관련해서 아시는게 있으시면 답변부탁드립니다.. 선생님 방금 시험 마친 수강생입니다.각 문제를 다 풀고 제출까지 완료한 후 검산을 시작했습니다.검산하다 시간이 얼마 안남았을 때 작업형1에 3번문제를 잘못 푼 것을 확인했습니다. 결국 답을 바꾸지 못한채 당황하다가 시험이 종료되는 시점에 시험 끝내기를 누르고 내용 동의 및 시험 끝내기를 눌렀습니다.시간 안에 최종적으로 제출해야하는지 알고 시험 끝내기를 누르고 당황해서 제출 미제출을 확인하지 못하고 '동의 및 시험 끝내기'를 눌렀습니다. 갑자기 멘붕와서 제출현황을 확인할 생각을 못했습니다. 문의하거나 관련 글이 있는지 찾아볼 생각으로 바로 나오기에 급급했습니다 ㅠ...이런상황에서 0점처리되는지 혹시 아시는게 있으실까요?
-
해결됨[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
제출 질문
학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요!질문과 관련된 영상 위치를 알려주면 더 빠르게 답변할 수 있어요먼저 유사한 질문이 있었는지 검색해보세요안녕하세요! 오늘 실기시험 보고왔습니다.시험 끝내기 버튼 누르고 제출 현황을 봤는데 풀이용은 다 미제출로 되어있더라고요. 혹시 이게 문제가 될까요....?
-
해결됨[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
11회차 풀이는 언제 업로드 될까요??
좋은 강의 해주신 덕분에 그래도 대부분 문제에서 뭐라도 손대볼 수 있었습니다. 감사합니다.제대로 제가 푼건지 궁금한데 11회차 풀이는 언제쯤 업로드 될까요??
-
해결됨[퇴근후딴짓] 빅데이터 분석기사 실기 (작업형1,2,3)
2유형 여쭤보고 싶습니다 ..!!
하필 model.selection 코드가 기억이 안나서 결국 검증을 못하고 rf로 돌려서 냈습니다..ㅠㅠ 오늘 같은 경우에는 전처리 과정이 따로 없어서 모델 튜닝으로 점수가 갈릴것같아서 걱정이 됩니다..이런경우에도 30점 이상 받을 수 있을까요 ?