묻고 답해요
156만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Kafka & Spark 활용한 Realtime Datalake
Spark Programs 구조에서 막혔습니다.
강의를 잘 따라가던 도중에 4040 웹에 접속했는데 저렇게 떴는데 뭐가 문제일까요?
-
미해결Kafka & Spark 활용한 Realtime Datalake
kafka-broker01 ping 반응 없습니다.
kafka-broker01에 ssh로 접속하고 Ping을했는데 회신이 없이 그대로 멈춰있습니다.
-
미해결Kafka & Spark 활용한 Realtime Datalake
github actions 오류 문의 드립니다.
안녕하세요. git push 이후 actions 오류 발생 되어 문의 드립니다.Github 에서 AWS codeDeploy 오류 확인 됩니다.어떤 부분을 체크 해보는게 좋을까요?
-
미해결Kafka & Spark 활용한 Realtime Datalake
github Actions 실행 강의 따라가는도중 막혔습니다.
강의에서는 master.yml도 appspec.yml이랑 똑같던데 제 환경에서 똑같이 따라했는데 master.yml과 appspec.yml의 아이콘이 다르게 뜹니다.git status를 치면 수정함: .github/workflows/master.yml, 수정함: appspec.yml 이렇게 떠야하는데 저는 modified: appspec.yml 이거 하나만 뜹니다. 강사님과 똑같이 따라했습니다. 이전에 git add . 를 한 적이 없음git push를 한 후에 깃헙 Actions에 들어가보면 There are no workflow runs yet. 이라고 뜹니다. 강사님처럼 workflow run이 안 됩니다.뭐가 문제일까요? 강의랑 똑같이 따라갔는데제가 다르게 한 거는 카프카 브로커를 만들때, 인스턴스 유형을 public-nat: t3.micro,kafka-broker: t3.small로 한 거만 다릅니다.
-
미해결Kafka & Spark 활용한 Realtime Datalake
confluent-kafka 의 produce 파라미터(on_devliery)
안녕하세요!먼저 이 강의를 통해 많은 것을 배우고 있습니다. 감사합니다강의를 수강중에 Simple Producer 코드에서 질문이 있습니다. 강의에서 비동기 방식으로 producer를 생성할 때 on_delivery 파라미터를 설명해주셨고, 아래 문서에서 예제 코드를 확인했을 때는 on_delivery가 아닌 callback 으로 파라미터를 받고 있는 것을 확인했습니다.https://docs.confluent.io/kafka-clients/python/current/overview.html 직접 코드를 확인해보니 아래와 같이 alias로 사용하는 것까지 확인했습니다.하지만, callback으로 파라미터를 넘겼을 때 어떻게 on_delivery로 값을 바인딩할 수 있는지에 대한 부분은 찾지 못하여 질문드립니다!alias가 어떻게 바인딩 되는지 어느 코드에서 찾을 수 있을까요? This is an asynchronous operation, an application may use the ``callback`` (alias ``on_delivery``) argument to pass a function (or lambda) that will be called from :py:func:`poll()` when the message has been successfully delivered or permanently fails delivery. confluent-kafka (python) 코드에서 강의 중에 poll() 메소드는 반드시 필요한 것이라고 이해를 했습니다.자바 기반인 apache kafka를 구현한 예제들을 보면 producer에서는 poll() 메소드를 사용하지 않는 것 같아서 질문드립니다.자바 기반인 경우 동작 방식이 달라서 그런걸가요?
-
미해결Kafka & Spark 활용한 Realtime Datalake
디스코드 초대장은 어디로 받나요?
디스코드 초대장은 어디로 받나요?
-
미해결Kafka & Spark 활용한 Realtime Datalake
install_zookeeper 플레이북 실행시 오류 해결
cd /home/ec2-user/downloadswget https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz 해당 폴더에 압축파일이 없다고 떠서 새로 다운받았습니다!
-
미해결Kafka & Spark 활용한 Realtime Datalake
ec2 인스턴스 생성
100$ 지원해주는 free tier로 계정을 만들고 Support 플랜이 Basic으로 되어있습니다.인스턴스 유형을 선택할 때 기본적으로 t3.micro로 선택이 되어있고t2.micro를 사용하려면 무료 요금제에선 사용할 수 없고 계정 요금제 업그레이드를 하라고 하는데 요금제를 업그레이드 하고 따라가면 될까요?
-
미해결Kafka & Spark 활용한 Realtime Datalake
디스코드 초대장 재발급 부탁드립니다!!
안녕하세요디스코드 초대장이 만료되었다고 떠서 재발급 부탁드립니다!
-
미해결Kafka & Spark 활용한 Realtime Datalake
Kafka Cluster 서버 구축 시 Docker 사용
안녕하세요! 강의 정말 잘 듣고 있습니다! 수업에서 Kafka 클러스터를 구축할 때 EC2 인스턴스 4대를 사용하셨는데요. 혹시 도커 리눅스 컨테이너 4개를 띄워 학습 환경을 구성해도 Kafka 클러스터 실습이 가능할까요?
-
미해결Kafka & Spark 활용한 Realtime Datalake
데이터레이크 구성요소 안나옵니다.
저도 데이터레이크 구성요소 영상 안나옵니다.소리는 나옵니다.위에도 있던데 어떻게 해결되었나요?다른영상은 나옵니다.
-
미해결Kafka & Spark 활용한 Realtime Datalake
codedeploy 배포 실패
안녕하세요! 강사님강의 잘 듣고 있습니다.github actions 실행 부분에서 codedeploy부분이 실패했는데요... github actions에서 성공이 떴고 버킷 이름도 잘 바꿨으며kafka-server에 code-deploy가 있는 것도 확인했습니다. 혹시 뭔가 확인해봐야할 게 있을까요?
-
미해결Kafka & Spark 활용한 Realtime Datalake
CodeDeploy 사용시 registration 요구
안녕하세요! 정말로 좋은 강의 너무 잘 듣고 있습니다. 지금 AWS 세팅하는 섹션을 듣고 있는데요, github actions 세팅하는 부분에서 CodeDeploy를 이용하려고 하면 registration을 요구하는 페이지로 넘어갑니다 (사진 첨부). 근데 전 이미 registration을 다 끝냈습니다. complete your AWS registration 버튼 눌러도 AWS 홈 화면으로만 돌아가네요. 안내문을 보니깐 free plan을 써서 그런 것 같은데...플랜을 업그레이드 해야하는 걸까요..ㅠㅠ
-
미해결Kafka & Spark 활용한 Realtime Datalake
세션1 퀴즈 문제답변관련
먼저 강의 만들어 주셔서 감사합니다.첫번째 강의 퀴즈에 대한 문의입니다. 문제 중 데이터 웨어하우스와 비교할 때, 데이터 레이크의 주요 차이점 중 하나는 무엇일까요?라는 질문에 대한 정답이 '데이터를 저장할 때 스키마 적용 방식'으로 되어있습니다. 이것은 데이터 웨어하우스의 특징이 아닌가요?해설에는 제대로 되어있는데... 정답이 되려면 '데이터를 읽을 때 스키마 적용 방식'이 되어야 할 것 같습니다.아니면 데이터 접근 권한 관리의 엄격성이 정답아닌가요? 전사의 모든 데이터가 모여있는데 웨어하우스는 처음부터 접근할 수 있는 사람들이 정해져 있지만, 레이크는 다양한 계층의 사용자에게 오픈되어 있어 권한 관리가 더 필요할 것이라 생각됩니다.
-
미해결Kafka & Spark 활용한 Realtime Datalake
subnet이 a~c까지만 있어요
kafka-broker03을 2d에 생성하고 싶은데, ap-southeast에 2a~2c까지만 있으면 2d는 수동으로 생성해야 하나요?
-
미해결Kafka & Spark 활용한 Realtime Datalake
[과제10-3] count 하지 않을때 `persist` 함수의 의미와 순서도에서 `scan csv`가 세번 나온 이유
질문만약 문제에서 카운트를 출력하라는 조건이 없었으면 persist를 호출 하지않아도 같은 퍼포먼스가 나오나요? (문제: ... 각각 데이터를 로드한 후 카운트를 출력하고,...)count 함수를 제외하면 action 계열 함수가 없는데 count 함수가 없는 경우에도 persist가 유의미 한지 궁금합니다. [SQL/DataFrame] - [Details for Query0] 순서도(마지막에 첨부) 에서 Scan csv 가 3번 나타나는 이유는 무엇인가요?? csv 파일을 읽어오는 함수는 두번 실행했는데 왜 3번 실행되는지 궁금합니다. 참고목차질문-참고-코드-순서도코드 구조HDFS 경로 정의스키마 정의데이터프레임 생성time_recorded에 의한 중복열 제거employee_count 컬럼 추가를 위한 joinIT Services and IT Consulting 회사 추출두 테이블 결합결과 출력dropDuplicate가 정확히 어떤 행을 drop 하는지 몰라서 egg-max 조합으로 중복 행을 제거함코드""" >>> company_industries_df.printSchema() root |-- company_id: string (nullable = true) |-- industry: string (nullable = true) >>> company_industries_df.show(5) +----------+--------------------+ |company_id| industry| +----------+--------------------+ | 391906|Book and Periodic...| | 22292832| Construction| | 20300| Banking| | 3570660|Book and Periodic...| | 878353|Staffing and Recr...| +----------+--------------------+ only showing top 5 rows >>> employee_counts_df.printSchema() root |-- company_id: string (nullable = true) |-- employee_count: string (nullable = true) |-- follower_count: string (nullable = true) |-- time_recorded: string (nullable = true) >>> employee_counts_df.show(5) +----------+--------------+--------------+-------------+ |company_id|employee_count|follower_count|time_recorded| +----------+--------------+--------------+-------------+ | 391906| 186| 32508| 1712346173| | 22292832| 311| 4471| 1712346173| | 20300| 1053| 6554| 1712346173| | 3570660| 383| 35241| 1712346173| | 878353| 52| 26397| 1712346173| +----------+--------------+--------------+-------------+ only showing top 5 rows +----------+--------------------+--------------+--------------+-------------+ |company_id| industry|employee_count|follower_count|time_recorded| +----------+--------------------+--------------+--------------+-------------+ | 1353|IT Services and I...| 596046| 14829798| 1713536641| | 1353|IT Services and I...| 595935| 14822564| 1713448937| | 1353|IT Services and I...| 595844| 14817360| 1713407024| | 1353|IT Services and I...| 595782| 14807439| 1713279321| | 1353|IT Services and I...| 595669| 14800204| 1713208435| +----------+--------------------+--------------+--------------+-------------+ only showing top 5 rows """ import time from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType from pyspark.sql.functions import col, max spark = SparkSession.builder.getOrCreate() # 1. 파일 경로 정의 company_industries_path = ( "/home/spark/sample/linkedin_jobs/companies/company_industries.csv" ) employee_counts_path = "/home/spark/sample/linkedin_jobs/companies/employee_counts.csv" # 2. 스키마 정의 company_industries_schema = StructType( [ StructField("company_id", IntegerType(), True), StructField("industry", StringType(), True), ] ) employee_counts_schema = StructType( [ StructField("company_id", IntegerType(), True), StructField("employee_count", IntegerType(), True), StructField("follower_count", IntegerType(), True), StructField("time_recorded", IntegerType(), True), ] ) # 3. 데이터프레임 생성 company_industries_df = ( spark.read.option("header", "true") .option("multiLine", "true") .schema(company_industries_schema) .csv(company_industries_path) ) employee_counts_df = ( spark.read.option("header", "true") .option("multiLine", "true") .schema(employee_counts_schema) .csv(employee_counts_path) ) # 4. 중복 제거: groupby(company_id) -> agg -> max(time_recorded) agg_latest_employee_counts_df = employee_counts_df.groupBy("company_id").agg( max("time_recorded").alias("time_recorded") ) # 5. employee_count 컬럼 추가 latest_employee_counts_df = agg_latest_employee_counts_df.join( employee_counts_df, on=["company_id", "time_recorded"], how="inner", ) # 6. 필터링: 업종이 IT Services and IT Consulting인 회사 추출 industry = "IT Services and IT Consulting" it_services_company_industries_df = company_industries_df.filter( col("industry") == industry ) # 7.조인: 회사 정보와 종업원 수 결합 및 필터링: 종업원수 1000명 이상 및 정렬: 종업원수 내림차순 정렬 result_df = ( it_services_company_industries_df.join( latest_employee_counts_df, on="company_id", how="inner", ) .filter(col("employee_count") >= 1000) .orderBy(col("employee_count").desc()) ) # 8. 'company_id', 'employee_count', 컬럼만 출력 result_df.select("company_id", "employee_count").show() time.sleep(300)순서도 이미지가 압축되어 올라가 [링크] 첨부합니다.
-
미해결Kafka & Spark 활용한 Realtime Datalake
[오타신고] 깃허브 레포지토리 내에 폴더명내 오타 (requrements.txt)
안녕하세요!레포지토리에 오타가 있어서 신고합니다.requrements.txt(파일명에 'i'가 빠져있어요.) 저는 경로까지 복사 해서 사용하는데 오타가 있으니 deploy/after_install.sh 설정과 달라 배포가 안되더라구요 ^^; 혹시 저와 같은 분들 계실까봐 남겨 놓습니다.https://github.com/hjkim-sun/datalake-pyspark-apps-season1/blob/ch8.5/requrements.txt 추신: 강의가 너무 좋습니다. 강의자료도 강의 진행도 너무 훌륭해요! 후속 강의도 너무 너무 기대됩니다! 다음 강의는 어떤걸 준비하고 계신지, 또 언제쯤 나올지 너무 궁금해요! 얼른 강의 내주세요 ㅎㅎ
-
미해결Kafka & Spark 활용한 Realtime Datalake
kafka 개념 질문
안녕하세요, 우선 데이터 관련 좋은 강의 올려주셔 감사합니다.수강 중 실습에서 AWS kafka 서버가 어떤 역할인지 헷갈려 질문 드립니다. 위 강의 pdf와 같이 지금까지 producer가 서버에 push하고 consumer가 서버(브로커)에 있는 메시지를 pull 하는 것으로 이해했고, AWS에 만든 3개의 kafka-broker01~03이 서버를 구성하는 3개의 '브로커'라고 이해했는데,실습을 진행하면서 AWS kafka-broker서버에서 파티션을 만들었으니 브로커는 맞는 것 같으나 동시에 producer, consumer 역할을 하는 것 같아서 원래 브로커가 서버를 구성하는 동시에 클라이언트의 역할도 하는 것인지 이해하는데 어려움을 겪고 있습니다.만약 그렇다면 kafka02에서 produce하고 kafka01에서 consume한다고 할 때, kafka02도 kafka01와 같은 cluster에 속하니 kafka02의 파티션에도 메시지가 저장되고 있는 것이 맞나요...?긴 글 읽어주셔서 감사드립니다.
-
미해결Kafka & Spark 활용한 Realtime Datalake
강사님 필기하실 때 어떤 프로그램 사용하시나요??
안녕하세요! 항상 강의 잘 듣고 있는 1인입니다. 필기시 사용하시는 프로그램이 궁금해서 질문드립니다!아 그리고 강의 소개쪽에 나와있는 디스코드 채널 링크를 클릭하면 '올바르지 않은 초대장' 이라고 나오는데 이 부분도 한 번 봐주시면 감사하겠습니다. 😄
-
미해결Kafka & Spark 활용한 Realtime Datalake
spark01 인스턴스 생성시 문제점 발생
우분투 서버 선택하는부분에 강사님과 다른 목록밖에 없어서 인스턴스 시작이 되고있지 않습니다.