묻고 답해요
167만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Kafka & Spark 활용한 Realtime Datalake
subnet이 a~c까지만 있어요
kafka-broker03을 2d에 생성하고 싶은데, ap-southeast에 2a~2c까지만 있으면 2d는 수동으로 생성해야 하나요?
-
미해결Kafka & Spark 활용한 Realtime Datalake
[과제10-3] count 하지 않을때 `persist` 함수의 의미와 순서도에서 `scan csv`가 세번 나온 이유
질문만약 문제에서 카운트를 출력하라는 조건이 없었으면 persist를 호출 하지않아도 같은 퍼포먼스가 나오나요? (문제: ... 각각 데이터를 로드한 후 카운트를 출력하고,...)count 함수를 제외하면 action 계열 함수가 없는데 count 함수가 없는 경우에도 persist가 유의미 한지 궁금합니다. [SQL/DataFrame] - [Details for Query0] 순서도(마지막에 첨부) 에서 Scan csv 가 3번 나타나는 이유는 무엇인가요?? csv 파일을 읽어오는 함수는 두번 실행했는데 왜 3번 실행되는지 궁금합니다. 참고목차질문-참고-코드-순서도코드 구조HDFS 경로 정의스키마 정의데이터프레임 생성time_recorded에 의한 중복열 제거employee_count 컬럼 추가를 위한 joinIT Services and IT Consulting 회사 추출두 테이블 결합결과 출력dropDuplicate가 정확히 어떤 행을 drop 하는지 몰라서 egg-max 조합으로 중복 행을 제거함코드""" >>> company_industries_df.printSchema() root |-- company_id: string (nullable = true) |-- industry: string (nullable = true) >>> company_industries_df.show(5) +----------+--------------------+ |company_id| industry| +----------+--------------------+ | 391906|Book and Periodic...| | 22292832| Construction| | 20300| Banking| | 3570660|Book and Periodic...| | 878353|Staffing and Recr...| +----------+--------------------+ only showing top 5 rows >>> employee_counts_df.printSchema() root |-- company_id: string (nullable = true) |-- employee_count: string (nullable = true) |-- follower_count: string (nullable = true) |-- time_recorded: string (nullable = true) >>> employee_counts_df.show(5) +----------+--------------+--------------+-------------+ |company_id|employee_count|follower_count|time_recorded| +----------+--------------+--------------+-------------+ | 391906| 186| 32508| 1712346173| | 22292832| 311| 4471| 1712346173| | 20300| 1053| 6554| 1712346173| | 3570660| 383| 35241| 1712346173| | 878353| 52| 26397| 1712346173| +----------+--------------+--------------+-------------+ only showing top 5 rows +----------+--------------------+--------------+--------------+-------------+ |company_id| industry|employee_count|follower_count|time_recorded| +----------+--------------------+--------------+--------------+-------------+ | 1353|IT Services and I...| 596046| 14829798| 1713536641| | 1353|IT Services and I...| 595935| 14822564| 1713448937| | 1353|IT Services and I...| 595844| 14817360| 1713407024| | 1353|IT Services and I...| 595782| 14807439| 1713279321| | 1353|IT Services and I...| 595669| 14800204| 1713208435| +----------+--------------------+--------------+--------------+-------------+ only showing top 5 rows """ import time from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType from pyspark.sql.functions import col, max spark = SparkSession.builder.getOrCreate() # 1. 파일 경로 정의 company_industries_path = ( "/home/spark/sample/linkedin_jobs/companies/company_industries.csv" ) employee_counts_path = "/home/spark/sample/linkedin_jobs/companies/employee_counts.csv" # 2. 스키마 정의 company_industries_schema = StructType( [ StructField("company_id", IntegerType(), True), StructField("industry", StringType(), True), ] ) employee_counts_schema = StructType( [ StructField("company_id", IntegerType(), True), StructField("employee_count", IntegerType(), True), StructField("follower_count", IntegerType(), True), StructField("time_recorded", IntegerType(), True), ] ) # 3. 데이터프레임 생성 company_industries_df = ( spark.read.option("header", "true") .option("multiLine", "true") .schema(company_industries_schema) .csv(company_industries_path) ) employee_counts_df = ( spark.read.option("header", "true") .option("multiLine", "true") .schema(employee_counts_schema) .csv(employee_counts_path) ) # 4. 중복 제거: groupby(company_id) -> agg -> max(time_recorded) agg_latest_employee_counts_df = employee_counts_df.groupBy("company_id").agg( max("time_recorded").alias("time_recorded") ) # 5. employee_count 컬럼 추가 latest_employee_counts_df = agg_latest_employee_counts_df.join( employee_counts_df, on=["company_id", "time_recorded"], how="inner", ) # 6. 필터링: 업종이 IT Services and IT Consulting인 회사 추출 industry = "IT Services and IT Consulting" it_services_company_industries_df = company_industries_df.filter( col("industry") == industry ) # 7.조인: 회사 정보와 종업원 수 결합 및 필터링: 종업원수 1000명 이상 및 정렬: 종업원수 내림차순 정렬 result_df = ( it_services_company_industries_df.join( latest_employee_counts_df, on="company_id", how="inner", ) .filter(col("employee_count") >= 1000) .orderBy(col("employee_count").desc()) ) # 8. 'company_id', 'employee_count', 컬럼만 출력 result_df.select("company_id", "employee_count").show() time.sleep(300)순서도 이미지가 압축되어 올라가 [링크] 첨부합니다.
-
미해결Kafka & Spark 활용한 Realtime Datalake
[오타신고] 깃허브 레포지토리 내에 폴더명내 오타 (requrements.txt)
안녕하세요!레포지토리에 오타가 있어서 신고합니다.requrements.txt(파일명에 'i'가 빠져있어요.) 저는 경로까지 복사 해서 사용하는데 오타가 있으니 deploy/after_install.sh 설정과 달라 배포가 안되더라구요 ^^; 혹시 저와 같은 분들 계실까봐 남겨 놓습니다.https://github.com/hjkim-sun/datalake-pyspark-apps-season1/blob/ch8.5/requrements.txt 추신: 강의가 너무 좋습니다. 강의자료도 강의 진행도 너무 훌륭해요! 후속 강의도 너무 너무 기대됩니다! 다음 강의는 어떤걸 준비하고 계신지, 또 언제쯤 나올지 너무 궁금해요! 얼른 강의 내주세요 ㅎㅎ
-
미해결Kafka & Spark 활용한 Realtime Datalake
kafka 개념 질문
안녕하세요, 우선 데이터 관련 좋은 강의 올려주셔 감사합니다.수강 중 실습에서 AWS kafka 서버가 어떤 역할인지 헷갈려 질문 드립니다. 위 강의 pdf와 같이 지금까지 producer가 서버에 push하고 consumer가 서버(브로커)에 있는 메시지를 pull 하는 것으로 이해했고, AWS에 만든 3개의 kafka-broker01~03이 서버를 구성하는 3개의 '브로커'라고 이해했는데,실습을 진행하면서 AWS kafka-broker서버에서 파티션을 만들었으니 브로커는 맞는 것 같으나 동시에 producer, consumer 역할을 하는 것 같아서 원래 브로커가 서버를 구성하는 동시에 클라이언트의 역할도 하는 것인지 이해하는데 어려움을 겪고 있습니다.만약 그렇다면 kafka02에서 produce하고 kafka01에서 consume한다고 할 때, kafka02도 kafka01와 같은 cluster에 속하니 kafka02의 파티션에도 메시지가 저장되고 있는 것이 맞나요...?긴 글 읽어주셔서 감사드립니다.
-
미해결Kafka & Spark 활용한 Realtime Datalake
강사님 필기하실 때 어떤 프로그램 사용하시나요??
안녕하세요! 항상 강의 잘 듣고 있는 1인입니다. 필기시 사용하시는 프로그램이 궁금해서 질문드립니다!아 그리고 강의 소개쪽에 나와있는 디스코드 채널 링크를 클릭하면 '올바르지 않은 초대장' 이라고 나오는데 이 부분도 한 번 봐주시면 감사하겠습니다. 😄
-
미해결Kafka & Spark 활용한 Realtime Datalake
spark01 인스턴스 생성시 문제점 발생
우분투 서버 선택하는부분에 강사님과 다른 목록밖에 없어서 인스턴스 시작이 되고있지 않습니다.
-
미해결Kafka & Spark 활용한 Realtime Datalake
python auto_commit_consumer.py 실행 이후
실행이 되지 않습니다. 원인이 어떤 것들이 있을까요?
-
미해결Kafka & Spark 활용한 Realtime Datalake
ui for apache kafka 브라우저 접속했을때
브로커 탭에 브로커 아이디가 1번에 체크표시가 되어있는데 상관없나요? ppt에는 2번으로 설정하신거같은데 그대로 했는데 1번으로 지정되었습니다.
-
미해결실리콘밸리 리더가 알려주는 빅데이터 처리 (Spark)
실습 code 강의자료 문의
친절한 설명과 함께 강의자료를 공유해 주셔서 감사합니다.그런데 현재 강의자료로 올려주신 pdf 외에, 실습에 사용하신 code도 공유해주실 수 있을까요~?수업 중, 내용흐름을 이해하는데 코드를 별도로 볼 수 있다면 큰 도움이 될 것 같습니다.현재는 실습 중 사용된 변수가 어디에서 정의되었는지, 다시 확인하려면 강의를 돌려보면서 확인해야하는 애로사항이 있습니다.
-
해결됨실리콘밸리 리더가 알려주는 빅데이터 처리 (Spark)
강의자료 다운로드 문의
제게 꼭 필요한 강의를 알기쉽게 강의해 주셔서 감사합니다.다름아니라, 강의 중에 교수님께서 강의자료를 올려놓았으니 다운로드 받으라고 하시는데, 어디에서 강의자료를 다운로드 받을 수 있을까요?
-
해결됨Kafka & Spark 활용한 Realtime Datalake
Ansible과 NAT 환경에서 발생 가능한 root 권한 노출 리스크에 대한 질문
안녕하세요, 선생님. 이전 강의에서 저희는 외부에서 직접적으로 private subnet에 접근하지 못하도록 하기 위해 NAT 인스턴스를 생성하고, outbound 트래픽 규칙도 사용자 지정으로 변경하였습니다. 이를 통해 NAT 인스턴스가 외부 접근을 차단하는 일종의 방화벽 역할을 한다고 이해하고 있습니다. 이 구조는 다음과 같이 2단계 방화벽을 형성한다고 생각됩니다:1. 클라이언트에서 NAT로의 접근2. NAT에서 private 서버로의 접근그런데 Ansible에서 ansible_ssh_private_key_file 옵션만으로 .pem 파일을 지정하면 별도의 계정 정보 없이도 private 서버에 접근이 가능하다는 점이 조금 놀라웠습니다. 아마도 ansible.cfg나 inventory에 특정 설정이 추가되어 있어 가능한 것 같긴 하지만, 실제로 .pem만 존재해도 root 권한 접근까지 이어질 수 있다는 점에서 우려가 되었습니다. 특히, 악의적인 사용자가 만약 public NAT 인스턴스에 접근할 수 있는 상황이라면, ansible-playbook 파일과 become: yes 옵션을 활용해 손쉽게 private 서버의 root 권한을 획득할 수 있을 것으로 보입니다. 이런 보안 리스크를 줄이기 위해서는 어떤 식의 아키텍처 개선이나 운영 전략이 필요할지, 보안적으로 권장되는 방식이 있다면 조언을 듣고 싶습니다.항상 감사합니다.
-
해결됨Kafka & Spark 활용한 Realtime Datalake
Ansibleplaybookclone&playbook실행 안됩니다
pdf 3-7 20페이지에git clone https://github.com/hjkim-sun/datalake-ansible-playbook-season1.gitansible_playbooks 실행 하면username과 pw를 입력하라고 나오는데 어떻게 해야되는건가요?
-
미해결Kafka & Spark 활용한 Realtime Datalake
강의안 문제
강의안이 다 안 올라와 있는 것 같습니다
-
미해결Kafka & Spark 활용한 Realtime Datalake
데이터레이크 구성요소
영상 화면이 안나오는것 같습니다!!