Inflearn brand logo image

인프런 커뮤니티 질문&답변

jongphago님의 프로필 이미지
jongphago

작성한 질문수

Kafka & Spark 활용한 Realtime Datalake

DataFrame 파티션

[과제10-3] count 하지 않을때 `persist` 함수의 의미와 순서도에서 `scan csv`가 세번 나온 이유

작성

·

44

·

수정됨

0

질문

  1. 만약 문제에서 카운트를 출력하라는 조건이 없었으면 persist를 호출 하지않아도 같은 퍼포먼스가 나오나요? (문제: ... 각각 데이터를 로드한 후 카운트를 출력하고,...)


    count 함수를 제외하면 action 계열 함수가 없는데 count 함수가 없는 경우에도 persist가 유의미 한지 궁금합니다.

     

  2. [SQL/DataFrame] - [Details for Query0] 순서도(마지막에 첨부) 에서 Scan csv 가 3번 나타나는 이유는 무엇인가요??
    csv 파일을 읽어오는 함수는 두번 실행했는데 왜 3번 실행되는지 궁금합니다.

     

참고

  • 목차

    • 질문-참고-코드-순서도

  • 코드 구조

  1. HDFS 경로 정의

  2. 스키마 정의

  3. 데이터프레임 생성

  4. time_recorded에 의한 중복열 제거

  5. employee_count 컬럼 추가를 위한 join

  6. IT Services and IT Consulting 회사 추출

  7. 두 테이블 결합

  8. 결과 출력

  • dropDuplicate가 정확히 어떤 행을 drop 하는지 몰라서 egg-max 조합으로 중복 행을 제거함

코드

"""
>>> company_industries_df.printSchema()
root
 |-- company_id: string (nullable = true)
 |-- industry: string (nullable = true)

>>> company_industries_df.show(5)
+----------+--------------------+
|company_id|            industry|
+----------+--------------------+
|    391906|Book and Periodic...|
|  22292832|        Construction|
|     20300|             Banking|
|   3570660|Book and Periodic...|
|    878353|Staffing and Recr...|
+----------+--------------------+
only showing top 5 rows

>>> employee_counts_df.printSchema()
root
 |-- company_id: string (nullable = true)
 |-- employee_count: string (nullable = true)
 |-- follower_count: string (nullable = true)
 |-- time_recorded: string (nullable = true)

>>> employee_counts_df.show(5)
+----------+--------------+--------------+-------------+
|company_id|employee_count|follower_count|time_recorded|
+----------+--------------+--------------+-------------+
|    391906|           186|         32508|   1712346173|
|  22292832|           311|          4471|   1712346173|
|     20300|          1053|          6554|   1712346173|
|   3570660|           383|         35241|   1712346173|
|    878353|            52|         26397|   1712346173|
+----------+--------------+--------------+-------------+
only showing top 5 rows


+----------+--------------------+--------------+--------------+-------------+
|company_id|            industry|employee_count|follower_count|time_recorded|
+----------+--------------------+--------------+--------------+-------------+
|      1353|IT Services and I...|        596046|      14829798|   1713536641|
|      1353|IT Services and I...|        595935|      14822564|   1713448937|
|      1353|IT Services and I...|        595844|      14817360|   1713407024|
|      1353|IT Services and I...|        595782|      14807439|   1713279321|
|      1353|IT Services and I...|        595669|      14800204|   1713208435|
+----------+--------------------+--------------+--------------+-------------+
only showing top 5 rows

"""

import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, max

spark = SparkSession.builder.getOrCreate()

# 1. 파일 경로 정의
company_industries_path = (
    "/home/spark/sample/linkedin_jobs/companies/company_industries.csv"
)
employee_counts_path = "/home/spark/sample/linkedin_jobs/companies/employee_counts.csv"
# 2. 스키마 정의
company_industries_schema = StructType(
    [
        StructField("company_id", IntegerType(), True),
        StructField("industry", StringType(), True),
    ]
)
employee_counts_schema = StructType(
    [
        StructField("company_id", IntegerType(), True),
        StructField("employee_count", IntegerType(), True),
        StructField("follower_count", IntegerType(), True),
        StructField("time_recorded", IntegerType(), True),
    ]
)

# 3. 데이터프레임 생성
company_industries_df = (
    spark.read.option("header", "true")
    .option("multiLine", "true")
    .schema(company_industries_schema)
    .csv(company_industries_path)
)

employee_counts_df = (
    spark.read.option("header", "true")
    .option("multiLine", "true")
    .schema(employee_counts_schema)
    .csv(employee_counts_path)
)


# 4. 중복 제거: groupby(company_id) -> agg -> max(time_recorded)
agg_latest_employee_counts_df = employee_counts_df.groupBy("company_id").agg(
    max("time_recorded").alias("time_recorded")
)


# 5. employee_count 컬럼 추가
latest_employee_counts_df = agg_latest_employee_counts_df.join(
    employee_counts_df,
    on=["company_id", "time_recorded"],
    how="inner",
)


# 6. 필터링: 업종이 IT Services and IT Consulting인 회사 추출
industry = "IT Services and IT Consulting"
it_services_company_industries_df = company_industries_df.filter(
    col("industry") == industry
)

# 7.조인: 회사 정보와 종업원 수 결합 및 필터링: 종업원수 1000명 이상 및 정렬: 종업원수 내림차순 정렬
result_df = (
    it_services_company_industries_df.join(
        latest_employee_counts_df,
        on="company_id",
        how="inner",
    )
    .filter(col("employee_count") >= 1000)
    .orderBy(col("employee_count").desc())
)

# 8. 'company_id', 'employee_count', 컬럼만 출력
result_df.select("company_id", "employee_count").show()

time.sleep(300)

순서도

이미지가 압축되어 올라가 [링크] 첨부합니다.

dataframe_cache.py - Details for Query 0.jpg.webp

 

답변 2

0

김현진님의 프로필 이미지
김현진
지식공유자

안녕하세요 jongphago 님!

답글이 좀 늦었습니다.

질문에 대해 답변드리자면 질문 1과 2는 서로 연관 있습니다.

한마디로 작성하신 코드에 persist가 없기 때문에 employee_counts_df 를 두 번 읽어들인 것입니다.

조금 더 자세히 말씀드리자면

  1. 원래 과제에 중간 중간 있던 count() 액션 함수를 쓰지 않더라도 persist()는 도움이 됩니다.
    persist() 후 함수 연산을 쭉 작성했을 때 비록 액션 함수를 만났을 때 비로소 Cache로 올리지만 Dataframe에 대한 실행 계획은 Cache 에서 가져오는걸로 바뀌게 됩니다.


    즉 올려주신 코드에서 보면 맨 마지막에 show() 라는 액션 함수가 있습니다.

    따라서 비록 show()를 만나면서 실제 연산이 시작되고 (만약 persist가 있었다면) 캐싱도 되겠지만
    show()가 시작되기 전의 데이터프레임 실행계획을 한번 까보시면 cache에서 가져와서 처리하는 실행 계획을 보실 수 있을 거에요.
    (Spark 실행 계획 보는 방법에 대해서는 해당 섹션 뒷부분에 나옵니다)

  2. 지금 작성하신 코드에서 persist()가 하나도 없습니다.
    따라서 employee_counts_df 데이터프레임에서 중복제거를 통해 agg_latest_employee_counts_df 데이터프레임을 만들었지만 이 작업을 마치는 순간 employee_counts_df 데이터프레임은 메모리에서 사라집니다. (persist를 안해놨기 때문에)

    따라서 agg_latest_employee_counts_df 와 employee_counts_df 조인을 하려고 할 때
    employee_counts_df 를 다시 read_csv 로 읽어와야 하는 것이죠.

    따라서 employee_counts_df 를 2번, company_industries_df 를 1번 읽어들인 겁니다.
    아래처럼 employee_counts_df 를 로드하고 나서 바로 persist() 해보세요.

employee_counts_df = (
    spark.read.option("header", "true")
    .option("multiLine", "true")
    .schema(employee_counts_schema)
    .csv(employee_counts_path)
)
employee_counts_df.persist() #  persist 수행

agg_latest_employee_counts_df = employee_counts_df.groupBy("company_id").agg(
    max("time_recorded").alias("time_recorded")
)


# 5. employee_count 컬럼 추가
latest_employee_counts_df = agg_latest_employee_counts_df.join(
    employee_counts_df,
    on=["company_id", "time_recorded"],
    how="inner",
)
explain(latest_employee_counts_df)  # 실행계획 보기 


이렇게 해보시면 latest_employee_counts_df 에 대한 실행 계획이 나올 텐데 지금은 보기 좀 어려우실 수 있지만 employee_counts_df 를 Cache 에서 가져온다는 계획이 보일거에요.

이해가 되셨나요?

궁금하신 건 언제든 질문주세요 ^^

 

0

안녕하세요, 인프런 AI 인턴입니다. jongphago님!

질문하신 내용을 바탕으로 persistScan csv 횟수에 대해 설명드리겠습니다.

1. persist 함수의 의미

persist 함수는 DataFrame이나 RDD를 메모리나 디스크에 캐시하여 재사용 시 I/O 성능을 향상시킵니다. 이 함수는 데이터가 반복적으로 액세스되거나 액션(action) 연산이 수행될 때 유효합니다.

  • 카운트 액션 유무: count와 같은 액션 함수가 없으면, 그 자체로는 persist를 호출하지 않아도 직접적인 성능 차이는 보이지 않을 수 있습니다. 그러나 만약 여러 연산에서 동일한 DataFrame을 참조한다면, persist를 사용하여 연산 효율성을 높일 수 있습니다.

  • count가 없더라도: 다른 액션이 발생할 때 persist는 의미가 있으며, 중복 계산을 줄이는 데 도움이 되지만 심각한 성능 차이는 아닐 수 있습니다.

2. Scan csv가 세 번 나타나는 이유

  • 여러 번의 데이터 접근: Spark에서 데이터 분석 시, 동일한 파일을 여러 변환 과정에서 반복적으로 액세스할 수 있습니다. 이는 다양한 DataFrame 연산에서 같은 데이터 소스를 여러 번 스캔할 수 있음을 의미합니다.

  • 최적화 연산: Spark의 Catalyst 옵티마이저는 쿼리를 최적화하면서 여러 스캔을 요구할 수 있으며, 이는 중복된 데이터를 피하거나 효율적으로 처리하기 위함입니다.

관련 자료 참고

보다 구체적인 예시와 설명을 통해 이해도를 높이기 위해 PySpark의 persistcache 사용법에 대한 튜토리얼을 자주 참고하실 수 있습니다. Scan csv와 관련된 계획을 이해하기 위해 Spark UI에서 DAG 시각화를 보시면 도움이 될 수 있습니다.

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.

jongphago님의 프로필 이미지
jongphago

작성한 질문수

질문하기