append 모드 사용 시 집계
writeStream 에서 append 모드 사용 시 지원되지 않는 오류가 발생한다고 하셨는데 그럼 데이터를 추가하면서 집계를 하고 싶은 경우에는 어떻게 해야 하나요?
回答 2
1
안녕하세요 권지혁님,
핵심은 “Append 모드로는 ‘최종값만 확정된 행’만 쓸 수 있다”는 점이에요. 집계(aggregation)는 원래 값이 계속 변하므로, 최종값을 판정할 기준(이벤트 시간이 닫혔다 등)이 없으면 append가 불가합니다
하지만, 이벤트 타임 열과 워터마크를 설정하고, 시간 윈도우(또는 세션 윈도우) 로 집계하면, 워터마크를 넘어 늦게 올 데이터는 무시되고, 해당 윈도우의 결과를 최종값으로 확정할 수 있어 append가 됩니다.
from pyspark.sql import functions as F
events = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "events")
.load()
.selectExpr("CAST(value AS STRING) AS value",
"CAST(timestamp AS TIMESTAMP) AS ts")
)
agg = (
events
.withWatermark("ts", "15 minutes") # 늦게 오는 데이터 허용 한도
.groupBy(F.window("ts", "10 minutes"), F.col("value"))
.count()
.select( # 윈도우 컬럼 평탄화(선택)
F.col("value"),
F.col("count"),
F.col("window.start").alias("window_start"),
F.col("window.end").alias("window_end"),
)
)
query = (
agg.writeStream
.outputMode("append") # 윈도우가 ‘닫히면’ 최종 레코드만 append
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("topic", "events_agg_10m")
.option("checkpointLocation", "/chk/agg_10m")
.start()
)왜 되나? 윈도우는 끝 시각이 있고, 워터마크가 지나면 “이 윈도우 값은 더 이상 바뀌지 않는다”라고 판단 가능 → 최종 레코드만 append.
즉, Kafka/File 등 append-only 싱크로 집계를 쓰고 싶다라고 하시면 withWatermark + window 집계 + outputMode("append")로 디자인 하시면 될 듯 합니다.
0
안녕하세요, 인프런 AI 인턴입니다. 권지혁님.
Spark Structured Streaming에서 append 모드는 지속적으로 들어오는 데이터 중에서 특정 조건을 만족하는 데이터만을 최종 출력으로 남기는 방식입니다. 따라서 집계나 상태 정보를 저장해야 하는 경우에는 지원되지 않습니다. 데이터를 추가하면서 집계를 하려면 다음과 같은 방법을 고려할 수 있습니다:
Micro-Batch 기반의 Update를 사용하는 방법
append모드 대신complete또는update모드를 사용할 수 있습니다.
complete모드는 전체 집계 결과를 매번 출력하며,update모드는 변화가 있는 살아있는 상태만 갱신합니다.
query = aggregated_df.writeStream \
.outputMode("complete") \
.format("console") \
.start()
이때 complete 모드를 사용하면 집계된 결과 전체를 매번 출력하기에 시스템 자원 사용이 많을 수 있으니 주의가 필요합니다.
관련된 자세한 방법들은 다음 링크에서 확인할 수 있습니다:
- append 모드와 stateful operations
질문에 대해 추가 설명이 필요하시거나 다른 부분이 궁금하신 점이 있으면 언제든지 문의해 주세요. 저도 질문자님의 이해를 돕고자 최선을 다하겠습니다.
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다. 현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.
databricks에 cluster 메뉴가 없습니다.
0
49
2
주피터 사용 및 도커 설치
0
94
2
S3 폴더 구조에 따른 Static Partition Pruning, DPP 질문
0
77
2
broadcast Join과 boradcast + UDF 차이
0
63
2
CDC 기반 스파크 아키텍처 구현 방법이 궁금합니다.
0
108
2
BroadCast 지원 가능 메모리 문의
0
90
2
rdd, dataframe, spark sql 각각 언제 사용할까요?
0
189
2
강의 후반부에 나온 실습 코드는 어디서 받을 수 있나요?
0
203
3
cdc 기반의 스트리밍 데이터 처리
1
231
2
broadcast 코드 예시 관련 질문
0
129
2
재생에 문제가 생겼어요 라는 메세지가 자꾸 뜨네요
0
116
1
스트리밍 예제 실행 시 py4j.protocol.Py4JNetworkError 발생
0
215
2
Repartition과 Coalesce에 대해 궁금한 점이 있어요!
1
166
1
pyspark 현업에서 로컬 개발 및 운영
1
338
1
Spark Structured Streaming Gracefully shutdown 질문
1
448
2
PySpark Runtime Architecture 내용 중 질문이 있습니다!
1
214
1
docker 설치 이후에 터미널 여는 부분에서 막혔습니다
1
460
2
app-<timestamp>-<executor_id> 폴더가 spark-events 내부에 생기게 하고 싶습니다.
1
277
1
pyspark.SparkContext 실행 오류 관련 질문
2
380
1
스파크 아키텍쳐 관련 문의 드립니다.
1
353
2
3.0에서 추가된 shuffle_hash, shuffle_replicate_nl hint는 어떤 상황에서 쓰는게 좋을까요?
1
320
1
데이터 엔지니터의 현 트렌드가 궁금합니다
1
473
1
스칼라강의도 제작하신다고 하셨는데 계획하고 계실까요??
1
297
2
pyspark / spark 차이점 질문드립니다.
1
1161
1

