강의

멘토링

로드맵

인프런 커뮤니티 질문&답변

권지혁님의 프로필 이미지
권지혁

작성한 질문수

실리콘밸리 엔지니어에게 배우는 파이썬 아파치 스파크

Structured Streaming Output Mode(아웃풋 모드)에 대해 자세히 알아보기

append 모드 사용 시 집계

해결된 질문

작성

·

9

0

writeStream 에서 append 모드 사용 시 지원되지 않는 오류가 발생한다고 하셨는데 그럼 데이터를 추가하면서 집계를 하고 싶은 경우에는 어떻게 해야 하나요?

답변 2

1

미쿡엔지니어님의 프로필 이미지
미쿡엔지니어
지식공유자

안녕하세요 권지혁님,

핵심은 “Append 모드로는 ‘최종값만 확정된 행’만 쓸 수 있다”는 점이에요. 집계(aggregation)는 원래 값이 계속 변하므로, 최종값을 판정할 기준(이벤트 시간이 닫혔다 등)이 없으면 append가 불가합니다

하지만, 이벤트 타임 열워터마크를 설정하고, 시간 윈도우(또는 세션 윈도우) 로 집계하면, 워터마크를 넘어 늦게 올 데이터는 무시되고, 해당 윈도우의 결과를 최종값으로 확정할 수 있어 append가 됩니다.

from pyspark.sql import functions as F

events = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "broker:9092")
         .option("subscribe", "events")
         .load()
         .selectExpr("CAST(value AS STRING) AS value",
                     "CAST(timestamp AS TIMESTAMP) AS ts")
)

agg = (
    events
    .withWatermark("ts", "15 minutes")                   # 늦게 오는 데이터 허용 한도
    .groupBy(F.window("ts", "10 minutes"), F.col("value"))
    .count()
    .select(                                             # 윈도우 컬럼 평탄화(선택)
        F.col("value"),
        F.col("count"),
        F.col("window.start").alias("window_start"),
        F.col("window.end").alias("window_end"),
    )
)

query = (
    agg.writeStream
       .outputMode("append")                             # 윈도우가 ‘닫히면’ 최종 레코드만 append
       .format("kafka")
       .option("kafka.bootstrap.servers", "broker:9092")
       .option("topic", "events_agg_10m")
       .option("checkpointLocation", "/chk/agg_10m")
       .start()
)

왜 되나? 윈도우는 끝 시각이 있고, 워터마크가 지나면 “이 윈도우 값은 더 이상 바뀌지 않는다”라고 판단 가능 → 최종 레코드만 append.

즉, Kafka/File 등 append-only 싱크로 집계를 쓰고 싶다라고 하시면 withWatermark + window 집계 + outputMode("append")로 디자인 하시면 될 듯 합니다.

0

안녕하세요, 인프런 AI 인턴입니다. 권지혁님.

Spark Structured Streaming에서 append 모드는 지속적으로 들어오는 데이터 중에서 특정 조건을 만족하는 데이터만을 최종 출력으로 남기는 방식입니다. 따라서 집계나 상태 정보를 저장해야 하는 경우에는 지원되지 않습니다. 데이터를 추가하면서 집계를 하려면 다음과 같은 방법을 고려할 수 있습니다:

Micro-Batch 기반의 Update를 사용하는 방법

  • append 모드 대신 complete 또는 update 모드를 사용할 수 있습니다.
  • complete 모드는 전체 집계 결과를 매번 출력하며, update 모드는 변화가 있는 살아있는 상태만 갱신합니다.
query = aggregated_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

이때 complete 모드를 사용하면 집계된 결과 전체를 매번 출력하기에 시스템 자원 사용이 많을 수 있으니 주의가 필요합니다.

관련된 자세한 방법들은 다음 링크에서 확인할 수 있습니다:
- append 모드와 stateful operations

질문에 대해 추가 설명이 필요하시거나 다른 부분이 궁금하신 점이 있으면 언제든지 문의해 주세요. 저도 질문자님의 이해를 돕고자 최선을 다하겠습니다.

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다. 현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.

권지혁님의 프로필 이미지
권지혁

작성한 질문수

질문하기