인프런 커뮤니티 질문&답변
/homework/ch10_3/dataframe_cache.py 과제 질문
작성
·
19
0
안녕하세요.
과제 도중 질문이 있어 질문 남깁니다.
우선 아래에 제가 작성한 코드 남기겠습니다!
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time
spark = SparkSession.builder.appName("dataframe_cache").getOrCreate()
# 회사별 산업도메인 CSV READ
com_ind_path = 'hdfs://home/spark/sample/linkedin_jobs/company_industries.csv'
com_ind_schema= 'company_id STRING, industry STRING'
ind_df = spark.read.option("header", "true").option("multiline", "true").schema(com_ind_schema).csv(com_ind_path)
# 회사별 종업원 수 CSV READ
com_count_path = 'hdfs://home/spark/sample/linkedin_jobs/employee_counts.csv'
com_count_schema = 'company_id STRING, employee_count INT, follower_count INT, time_recorded INT'
count_df = spark.read.option("header", "true").option("multiline", "true").schema(com_count_schema).csv(com_count_path)
# company_id 컬럼으로 중복 제거 후 진행
# drop_duplicate : transform 함수
company_count_df = count_df.dropDuplicates(['company_id'])
# 캐시 저장
ind_df.persist()
company_count_df.persist()
# count : action 함수
print(ind_df.count())
print(company_count_df.count())
# filter : transform 함수
# it_df : 산업도메인이 IT Service and IT Consulting인 회사
# big_df : 직원 수가 1000명 이상인 회사
it_df = ind_df.filter(col('industry') == 'IT Services and IT Consulting')
big_df = company_count_df.filter(col('employee_count') >= 1000)
# join : transform 함수
it_big_df = it_df.join(big_df,'company_id','inner')
# 결과 출력
it_big_df.select(['company_id','employee_count']).sort('employee_count',ascending=False).show()
# 5분 대기
time.sleep(300)
저는 join 작업시 중복을 제거한 직원수 dataframe이 사용되기 때문에 dropDuplicates() 메서드를 적용한 뒤 persist()를 하여 캐시에 저장하였습니다.
그런데 강사님의 깃허브 코드를 확인하니, 중복을 제거한 dataframe에 persist()를 적용하지 않아서 제가 잘못 생각하고 있는 부분이 있는지 궁금합니다.
답변
답변을 기다리고 있는 질문이에요
첫번째 답변을 남겨보세요!




