• 카테고리

    질문 & 답변
  • 세부 분야

    데이터 분석

  • 해결 여부

    미해결

Spark Structured Streaming Gracefully shutdown 질문

24.05.07 23:26 작성 조회수 49

1

안녕하세요. 좋은 강의 재밌게 수강하고 있습니다.


Spark Structured Streaming Fault Tolerance 강의에서

아래와 같이 gracefully 하게 스트리밍을 종료할 수 있다고 말씀 주신 부분에서 질문이 있습니다.

.config("spark.streaming.stopGracefullyOnShutdown", "true")

현재 업무에서 Spark Streaming을 사용했을 때 아래 코드와 같이 파라미터에 명확하게 stopGracefully 이 존재하여서
이를 이용하여 스트리밍을 안전하게 종료했습니다.
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit

 

하지만, Spark Strucutred Streaming으로 전환했을 때 이러한 파라미터가 존재하지 않아서, 아래 링크를 참고하여

직접 구현하였습니다.

 

https://stackoverflow.com/questions/45717433/stop-structured-streaming-query-gracefully

 

  1. 강의에서 알려주신 것처럼 아래와 같이 사용하면 동일하게
    Structured Streaming도 Gracefully하게 종료할 수 있다고 이해하면 될까요?

.config("spark.streaming.stopGracefullyOnShutdown", "true")

  1. Gracefully 스트리밍을 종료 한다라는 의미가 현재 처리 중인 마이크로 배치까지는 모두 다 처리 및 체크포인트 작성까지 한 후 스트리밍 종료로 이해하면 될까요?

  2. 마지막으로, DR 같이 스트리밍 종료가 아닌 클러스터가 모두 비정상적으로 종료되었을 경우 Gracefully 옵션이 적용되지 않는 케이스를 경험 했는데, 이런한 케이스는 현업에서 주로 어떻게 대처하고 있을까요?


    (예를 들어 체크 포인트 등이 불일치하게 스트리밍이 종료)

    감사합니다.

     

 

답변 2

·

답변을 작성해보세요.

1

안녕하세요 장원용님,

  • 강의에서 알려주신 것처럼 아래와 같이 사용하면 동일하게 Structured Streaming도 Gracefully하게 종료할 수 있다고 이해하면 될까요?

=> 제가 알기로는 spark.streaming.stopGracefullyOnShutdown는 RDD based-streaming에만 적용되는 걸로 알고 있고 어플리케이션에서 따로 처리해줘야 하는 걸로 알고 있습니다. 예를들면 아래와 같이 시그날을 받아서 query.stop()사용해주시면 될 듯 합니다.

import signal
import sys
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("StructuredStreamingApp").getOrCreate()

# Define a function to handle graceful shutdown
def graceful_shutdown(signum, frame):
    print("Graceful shutdown initiated")
    if 'query' in globals():
        query.stop()
    spark.stop()
    sys.exit(0)

# Register the signal handler for termination signals
signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)

# Define the streaming DataFrame
streaming_df = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

# Process the DataFrame
processed_df = streaming_df.selectExpr("CAST(value AS STRING) as message")

# Define the query
query = processed_df.writeStream.format("console").outputMode("append").start()

# Await termination of the query
try:
    query.awaitTermination()
except Exception as e:
    print("Exception in streaming query:", e)
finally:
    # Ensure the query is stopped when exiting
    query.stop()
  • Gracefully 스트리밍을 종료 한다라는 의미가 현재 처리 중인 마이크로 배치까지는 모두 다 처리 및 체크포인트 작성까지 한 후 스트리밍 종료로 이해하면 될까요?

=> 네, 맞습니다.

  • 마지막으로, DR 같이 스트리밍 종료가 아닌 클러스터가 모두 비정상적으로 종료되었을 경우 Gracefully 옵션이 적용되지 않는 케이스를 경험 했는데, 이런한 케이스는 현업에서 주로 어떻게 대처하고 있을까요?


    (예를 들어 체크 포인트 등이 불일치하게 스트리밍이 종료)

=> 일단 .option("checkpointLocation", "/path/to/checkpoint/dir") 적용해서 체크포인트 적용해주시고, 이게 잘 작동하지 않을 것을 대비해 Kafka나 Kinesis에서 오프셋을 항상 추적하고 한시간 전 정도 Rewind시켜서 다시 시작하시면 될 듯 합니다. 그리고 Lambda 아키텍쳐를 사용하셔서 항상 Day가 끝나면 배치로 다시 돌려서 다시 잡아주는 것도 좋은 방법인 듯합니다.

0

장원용님의 프로필

장원용

질문자

2024.05.14

자세한 답변 감사합니다!

그럼 강의에서 사용한 spark.streaming.stopGracefullyOnShutdown 옵션은 rdd based 이기 때문에 strucutred streaming은 위 방법으로 적용한다고 이해하면 될까요?

아래 설명해주신 부분을 잘 이해못한 것 같은데, 조금 더 자세히 설명 주실 수 있을까요?
spark에서 기록한 오프셋 정보와 실제 kafak 및 kinesis에 커밋한 오프셋 정보와 불일치할 것을 대비한게 맞을까요?

 

이게 잘 작동하지 않을 것을 대비해 Kafka나 Kinesis에서 오프셋을 항상 추적하고 한시간 전 정도 Rewind시켜서 다시 시작하시면 될 듯 합니다. 그리고 Lambda 아키텍쳐를 사용하셔서 항상 Day가 끝나면 배치로 다시 돌려서 다시 잡아주는 것도 좋은 방법

감사합니다!

그럼 강의에서 사용한 spark.streaming.stopGracefullyOnShutdown 옵션은 rdd based 이기 때문에 strucutred streaming은 위 방법으로 적용한다고 이해하면 될까요?

=> 네 그렇습니다.

spark에서 기록한 오프셋 정보와 실제 kafak 및 kinesis에 커밋한 오프셋 정보와 불일치할 것을 대비한게 맞을까요?

=> 네 그렇습니다. 말그래도 두번째 백업을 해 놓는 거랑 비슷합니다. 아무래도 읽는 Consumer와 Broker 사이에도 오프셋이 다를 수도 있으니까요.

 

도움이 되셨으면 좋은 리뷰 부탁드릴게요! ㅎㅎ