강의

멘토링

커뮤니티

인프런 커뮤니티 질문&답변

지구본님의 프로필 이미지
지구본

작성한 질문수

Kafka & Spark 활용한 Realtime Datalake

DataFrame 파티션

/homework/ch10_3/dataframe_cache.py 과제 질문

작성

·

19

0

안녕하세요.
과제 도중 질문이 있어 질문 남깁니다.
우선 아래에 제가 작성한 코드 남기겠습니다!

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

spark = SparkSession.builder.appName("dataframe_cache").getOrCreate()

# 회사별 산업도메인 CSV READ
com_ind_path = 'hdfs://home/spark/sample/linkedin_jobs/company_industries.csv'
com_ind_schema= 'company_id STRING, industry STRING'
ind_df = spark.read.option("header", "true").option("multiline", "true").schema(com_ind_schema).csv(com_ind_path)

# 회사별 종업원 수 CSV READ
com_count_path = 'hdfs://home/spark/sample/linkedin_jobs/employee_counts.csv'
com_count_schema = 'company_id STRING, employee_count INT, follower_count INT, time_recorded INT'
count_df = spark.read.option("header", "true").option("multiline", "true").schema(com_count_schema).csv(com_count_path)

# company_id 컬럼으로 중복 제거 후 진행
# drop_duplicate : transform 함수
company_count_df = count_df.dropDuplicates(['company_id'])

# 캐시 저장
ind_df.persist()
company_count_df.persist()

# count : action 함수
print(ind_df.count())
print(company_count_df.count())

# filter : transform 함수
# it_df : 산업도메인이 IT Service and IT Consulting인 회사
# big_df : 직원 수가 1000명 이상인 회사
it_df = ind_df.filter(col('industry') == 'IT Services and IT Consulting')
big_df = company_count_df.filter(col('employee_count') >= 1000)

# join : transform 함수
it_big_df = it_df.join(big_df,'company_id','inner')

# 결과 출력
it_big_df.select(['company_id','employee_count']).sort('employee_count',ascending=False).show()

# 5분 대기
time.sleep(300)


저는 join 작업시 중복을 제거한 직원수 dataframe이 사용되기 때문에 dropDuplicates() 메서드를 적용한 뒤 persist()를 하여 캐시에 저장하였습니다.

그런데 강사님의 깃허브 코드를 확인하니, 중복을 제거한 dataframe에 persist()를 적용하지 않아서 제가 잘못 생각하고 있는 부분이 있는지 궁금합니다.

답변

답변을 기다리고 있는 질문이에요
첫번째 답변을 남겨보세요!
지구본님의 프로필 이미지
지구본

작성한 질문수

질문하기