인프런 커뮤니티 질문&답변
/homework/ch10_3/dataframe_cache.py 과제 질문
작성
·
26
0
안녕하세요.
과제 도중 질문이 있어 질문 남깁니다.
우선 아래에 제가 작성한 코드 남기겠습니다!
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time
spark = SparkSession.builder.appName("dataframe_cache").getOrCreate()
# 회사별 산업도메인 CSV READ
com_ind_path = 'hdfs://home/spark/sample/linkedin_jobs/company_industries.csv'
com_ind_schema= 'company_id STRING, industry STRING'
ind_df = spark.read.option("header", "true").option("multiline", "true").schema(com_ind_schema).csv(com_ind_path)
# 회사별 종업원 수 CSV READ
com_count_path = 'hdfs://home/spark/sample/linkedin_jobs/employee_counts.csv'
com_count_schema = 'company_id STRING, employee_count INT, follower_count INT, time_recorded INT'
count_df = spark.read.option("header", "true").option("multiline", "true").schema(com_count_schema).csv(com_count_path)
# company_id 컬럼으로 중복 제거 후 진행
# drop_duplicate : transform 함수
company_count_df = count_df.dropDuplicates(['company_id'])
# 캐시 저장
ind_df.persist()
company_count_df.persist()
# count : action 함수
print(ind_df.count())
print(company_count_df.count())
# filter : transform 함수
# it_df : 산업도메인이 IT Service and IT Consulting인 회사
# big_df : 직원 수가 1000명 이상인 회사
it_df = ind_df.filter(col('industry') == 'IT Services and IT Consulting')
big_df = company_count_df.filter(col('employee_count') >= 1000)
# join : transform 함수
it_big_df = it_df.join(big_df,'company_id','inner')
# 결과 출력
it_big_df.select(['company_id','employee_count']).sort('employee_count',ascending=False).show()
# 5분 대기
time.sleep(300)
저는 join 작업시 중복을 제거한 직원수 dataframe이 사용되기 때문에 dropDuplicates() 메서드를 적용한 뒤 persist()를 하여 캐시에 저장하였습니다.
그런데 강사님의 깃허브 코드를 확인하니, 중복을 제거한 dataframe에 persist()를 적용하지 않아서 제가 잘못 생각하고 있는 부분이 있는지 궁금합니다.
답변 2
0
안녕하세요, 지구본님!
좋은 질문입니다.
우선 지구본님께서 작성하신것 처럼 작성하셔도 됩니다. 그리고 만약 아래처럼 doropDuplicates 할 때
company_count_df = count_df.dropDuplicates(['company_id'])그 결과가 대폭 줄어든다면 (ex: 수십 GB → 수백MB) 오히려 지구본님께서 작성하신 게 더 좋습니다.
제가 github에 올린 코드에서 dropDuplicates 연산 뒤에 persist()를 하지 않은 이유는 우선 company_emp_df 를 읽어들인 후 제일 먼저 count()를 먼저 확인해보는 과정이 있는데 count() 전에 persist()를 해놓지 않으면 결국 HDFS에서 두 번 읽어들이게 됩니다.
(company_emp_df를 로드한 직후에 수행한 count()에서 한 번, 맨 마지막 show()에서 한 번)
그래서 우선 company_emp_df를 로드하자마자 일단 persist()를 해 놓은 것인고, dropDuplicates 이후에도 persist()를 하면 좋겠지만 실습상 데이터가 그리 크지 않으므로 생략했다고 보시면 되겠습니다.
만약 데이터프레임을 로드하자마자 count() 확인해보는 과정이 없다면, 지구본님께서 작성하신 것처럼 dropDulicates() 와 같은 transform 함수 수행 이후에 persist() 하는 게 더 좋습니다.
좋은 질문이에요 ^^
화이팅입니다 😀
-1
안녕하세요, 인프런 AI 인턴입니다.
지구본님이 작성하신 코드를 보면, dropDuplicates() 메서드를 통해 중복을 제거한 후 persist() 메서드를 사용하여 데이터프레임을 캐시에 저장하셨습니다. 이는 특정 작업을 반복적으로 수행하거나 데이터를 여러 번 사용해야 할 때 매우 유용한 접근 방법입니다. persist()는 데이터가 메모리에 유지되어 이후의 연산에서 다시 계산하지 않도록 해 주므로 성능 향상에 도움이 될 수 있습니다.
하지만 모든 경우에 persist()를 사용해야 하는 것은 아닙니다. persist()는 메모리를 사용하기 때문에, 불필요하게 사용하면 메모리 낭비를 초래하여 시스템 자원을 과도하게 사용할 수 있습니다. 만약 중복을 제거한 후의 데이터프레임을 여러 번 사용하지 않거나, 한 번만 사용되며 크기가 크지 않다면, persist()를 사용하지 않고 필요할 때 새로 계산하는 것이 효율적일 수 있습니다.
강사님의 코드는 이런 자원 사용의 최적화를 고려하여 persist()를 사용하지 않은 것일 수 있습니다. 사용자가 자주 사용하는 경우에만 persist()를 사용하여 메모리를 효과적으로 활용하십시오. 적절한 장소에서의 캐싱은 성능 향상에 큰 도움이 될 수 있으므로 사용 시의 장단점을 잘 고려하여 적용하는 것이 좋습니다.
참고 링크:
- Spark Caching and Persistence




