/homework/ch10_3/dataframe_cache.py 과제 질문
69
投稿した質問数 2
안녕하세요.
과제 도중 질문이 있어 질문 남깁니다.
우선 아래에 제가 작성한 코드 남기겠습니다!
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time
spark = SparkSession.builder.appName("dataframe_cache").getOrCreate()
# 회사별 산업도메인 CSV READ
com_ind_path = 'hdfs://home/spark/sample/linkedin_jobs/company_industries.csv'
com_ind_schema= 'company_id STRING, industry STRING'
ind_df = spark.read.option("header", "true").option("multiline", "true").schema(com_ind_schema).csv(com_ind_path)
# 회사별 종업원 수 CSV READ
com_count_path = 'hdfs://home/spark/sample/linkedin_jobs/employee_counts.csv'
com_count_schema = 'company_id STRING, employee_count INT, follower_count INT, time_recorded INT'
count_df = spark.read.option("header", "true").option("multiline", "true").schema(com_count_schema).csv(com_count_path)
# company_id 컬럼으로 중복 제거 후 진행
# drop_duplicate : transform 함수
company_count_df = count_df.dropDuplicates(['company_id'])
# 캐시 저장
ind_df.persist()
company_count_df.persist()
# count : action 함수
print(ind_df.count())
print(company_count_df.count())
# filter : transform 함수
# it_df : 산업도메인이 IT Service and IT Consulting인 회사
# big_df : 직원 수가 1000명 이상인 회사
it_df = ind_df.filter(col('industry') == 'IT Services and IT Consulting')
big_df = company_count_df.filter(col('employee_count') >= 1000)
# join : transform 함수
it_big_df = it_df.join(big_df,'company_id','inner')
# 결과 출력
it_big_df.select(['company_id','employee_count']).sort('employee_count',ascending=False).show()
# 5분 대기
time.sleep(300)
저는 join 작업시 중복을 제거한 직원수 dataframe이 사용되기 때문에 dropDuplicates() 메서드를 적용한 뒤 persist()를 하여 캐시에 저장하였습니다.
그런데 강사님의 깃허브 코드를 확인하니, 중복을 제거한 dataframe에 persist()를 적용하지 않아서 제가 잘못 생각하고 있는 부분이 있는지 궁금합니다.
回答 2
0
안녕하세요, 지구본님!
좋은 질문입니다.
우선 지구본님께서 작성하신것 처럼 작성하셔도 됩니다. 그리고 만약 아래처럼 doropDuplicates 할 때
company_count_df = count_df.dropDuplicates(['company_id'])그 결과가 대폭 줄어든다면 (ex: 수십 GB → 수백MB) 오히려 지구본님께서 작성하신 게 더 좋습니다.
제가 github에 올린 코드에서 dropDuplicates 연산 뒤에 persist()를 하지 않은 이유는 우선 company_emp_df 를 읽어들인 후 제일 먼저 count()를 먼저 확인해보는 과정이 있는데 count() 전에 persist()를 해놓지 않으면 결국 HDFS에서 두 번 읽어들이게 됩니다.
(company_emp_df를 로드한 직후에 수행한 count()에서 한 번, 맨 마지막 show()에서 한 번)
그래서 우선 company_emp_df를 로드하자마자 일단 persist()를 해 놓은 것인고, dropDuplicates 이후에도 persist()를 하면 좋겠지만 실습상 데이터가 그리 크지 않으므로 생략했다고 보시면 되겠습니다.
만약 데이터프레임을 로드하자마자 count() 확인해보는 과정이 없다면, 지구본님께서 작성하신 것처럼 dropDulicates() 와 같은 transform 함수 수행 이후에 persist() 하는 게 더 좋습니다.
좋은 질문이에요 ^^
화이팅입니다 😀
-1
안녕하세요, 인프런 AI 인턴입니다.
지구본님이 작성하신 코드를 보면, dropDuplicates() 메서드를 통해 중복을 제거한 후 persist() 메서드를 사용하여 데이터프레임을 캐시에 저장하셨습니다. 이는 특정 작업을 반복적으로 수행하거나 데이터를 여러 번 사용해야 할 때 매우 유용한 접근 방법입니다. persist()는 데이터가 메모리에 유지되어 이후의 연산에서 다시 계산하지 않도록 해 주므로 성능 향상에 도움이 될 수 있습니다.
하지만 모든 경우에 persist()를 사용해야 하는 것은 아닙니다. persist()는 메모리를 사용하기 때문에, 불필요하게 사용하면 메모리 낭비를 초래하여 시스템 자원을 과도하게 사용할 수 있습니다. 만약 중복을 제거한 후의 데이터프레임을 여러 번 사용하지 않거나, 한 번만 사용되며 크기가 크지 않다면, persist()를 사용하지 않고 필요할 때 새로 계산하는 것이 효율적일 수 있습니다.
강사님의 코드는 이런 자원 사용의 최적화를 고려하여 persist()를 사용하지 않은 것일 수 있습니다. 사용자가 자주 사용하는 경우에만 persist()를 사용하여 메모리를 효과적으로 활용하십시오. 적절한 장소에서의 캐싱은 성능 향상에 큰 도움이 될 수 있으므로 사용 시의 장단점을 잘 고려하여 적용하는 것이 좋습니다.
참고 링크:
- Spark Caching and Persistence
파이참 25.3.4 버전에서 한폴더에 두개 프로젝트 생성하는 방법은?
0
26
2
consumer가 모든 파티션을 읽지 않는 이슈가 있습니다
0
79
3
람다 아키텍처에서 speed layer 관련 질문드립니다
0
54
1
아키텍처 관련 질문
0
66
1
code deploy 배포에러
0
72
4
ch4_3 apache zookeeper 설치파일 관련 질문
0
83
3
질문있습니다 섹션13. Spark streaming 부분
0
65
2
ot 자료
0
65
2
git action - code deploy단계
0
94
2
IDE - Pycharm / VSCode
0
86
1
UI For Apache Kafka 설치하기
0
127
2
kafka broker prompt 변경 확인
0
76
2
디스코드 만료
0
68
2
Zookeeper 설치 오류
1
81
3
github 레파지토리 생성 및 연결 관련 질문드립니다!
0
55
2
git action실행 안됨
0
162
1
터미널에서 NAT 인스턴스로 직접 로그인 안됨
0
65
1
AWS EC2 생성하기 2분55초 부분
0
57
2
git action에서 install_codedeploy-agent.yml
0
53
2
consumer 실행 오류 질문: connect ip 관련
0
95
2
Spark Programs 구조에서 막혔습니다.
0
60
2
kafka-broker01 ping 반응 없습니다.
0
74
3
github actions 오류 문의 드립니다.
0
74
2
github Actions 실행 강의 따라가는도중 막혔습니다.
0
79
2

