• 카테고리

    질문 & 답변
  • 세부 분야

    데이터 분석

  • 해결 여부

    미해결

spark_kafka 실행시 java.lang.IllegalArgumentException 에러

23.11.19 09:03 작성 조회수 353

1

Streaming에서 Kafka 데이타 추출하기 부분 진행하고 있는데요

spark_kafka.py 실행시 에러가 납니다. ㅠ

 

root@81599cbd6b8f:/opt/bitnami/spark/work# spark-submit --master spark://spark:7077 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 spark_kafka.py

.....
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   11  |   11  |   11  |   0   ||   11  |   11  |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-8f3a0b4c-b23d-4dfa-b9b0-8649735433fc
        confs: [default]
        11 artifacts copied, 0 already retrieved (56445kB/64ms)
23/11/18 23:57:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/18 23:57:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/11/18 23:57:24 WARN OffsetSeqMetadata: Updating the value of conf 'spark.sql.shuffle.partitions' in current session from '3' to '200'.
23/11/18 23:57:24 ERROR MicroBatchExecution: Query [id = 40288f62-daae-4e69-80db-ff6f83156268, runId = 535853f9-9153-44be-8eca-19f75ee8b4ea] terminated with error
java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"logOffset":2}
        at org.apache.spark.sql.kafka010.JsonUtils$.partitionOffsets(JsonUtils.scala:75)
        at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.deserializeOffset(KafkaMicroBatchStream.scala:216)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$getStartOffset$1(MicroBatchExecution.scala:454)
        at scala.Option.map(Option.scala:230)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.getStartOffset(MicroBatchExecution.scala:454)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:488)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:477)
        at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:802)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:473)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:266)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Traceback (most recent call last):
  File "/opt/bitnami/spark/work/spark_kafka.py", line 38, in <module>
    query.awaitTermination()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line 201, in awaitTermination
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 175, in deco
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 40288f62-daae-4e69-80db-ff6f83156268, runId = 535853f9-9153-44be-8eca-19f75ee8b4ea] terminated with exception: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"logOffset":2}

답변 7

·

답변을 작성해보세요.

1

안녕하세요 Oh Suhyeon님,

제가 보기에는 Configuration 에러 같은데, 에러를 풀이해 보자면 Kafka 오프셋에 들어있는 데이터 포맷이 스파크 스트리밍에서 기대했던 것과 달리 잘 맞지 않는 것 같은데요. 연습하시는 거라면, 카프카 데이터를 다시 지우고 다시 데이터를 넣으신 다음에 시작하시면 문제가 해결될 듯 합니다.

프로덕션 데이터라면 하나씩 데이터를 열어보고 어떻게 데이터가 잘 못 되었는지 알아야 되는데, 그건 아닌 것 같으니, 데이터 다시 넣으시고 시작하시는게 나은 해결책 같네요.

 

지난 번에 리뷰 남기신 거 보고, 스트리밍 데이터 다시 보강해 놨으니 좋은 공부 되시고, 좋은 리뷰 부탁드립니다.

0

Oh Suhyeon님의 프로필

Oh Suhyeon

질문자

2023.11.25

혹시 git 자료 다시 받을수 있을까요 알려주신 git 주소로 보면 404 에러가 나서요 권한이 없는것 같아요

안녕하세요 Oh subyeon님 다시 한번 확인해 보시겠어요? 공개로 바꿔놨습니다

0

안녕하세요 Oh Suhyeon님,

 

혹시 Docker Desktop에 대한 문제가 아닐까요? 윈도우에서는 네트워크상 문제가 잘 일어난다고 듣기는 했는데... 포트를 바꾸셔도 같은 에러라면 컨테이너끼리 서로 통신에 문제가 있는 것 같은데요.

혹시 제가 말씀 드렸던대로 nc 명령어 사용해 보셨나요?

root@dadc6458f9fe:/opt/bitnami/spark# nc -vz kafka 9092
Connection to kafka (172.30.0.3) 9092 port [tcp/*] succeeded!

답변에서 Kafka에서 read를 하셨다고 하는데, Spark Container에서 해 보신건가요?

그것도 아니라면, Spark나 Kafka 버전 문제일까요? 버전 매치가 꽤 중요하거든요... 다시 한번 예제 코드와 확인해 보시기 바랍니다.

마지막으로, 혹시 코드가 잘 못 되었나 싶어서 다시 실행을 해봤는데, 아무런 문제 없이 돌아가는데... 흠... 도무지 알 수가 없네요.

0

Oh Suhyeon님의 프로필

Oh Suhyeon

질문자

2023.11.19

네 volume으로 잡혀있는것을 확인하고 폴더를 지우고 도커컴포즈도 재시작 했습니다 ㅠ

호스트와 포트를 바꾼 이유는 실제로 데이터가 이상해서 에러가 난지 확인하기 위함이었습니다. 에러문구나 강사님에 처음에 답변주신내용은 데이터가 기대했던거랑 다르다라는 내용이라고 하였으나, kafka consumer.sh로 실행해봤을 때 데이터에는 특이사항이 없었습니다. ㅠ 그래서 혹시 통신이 안됐는데 데이터가 이상하다고 에러 뱉는건 아닐까 의심이 들어 포트를 일부러 이상한 값을 넣어서 테스트 해본것입니다. 카프카랑 아예상관없는 포트를 넣어도 에러내용은 동일해, 데이터 이슈는 아닌것 같네요 ㅠㅠ kafka 호스트랑 포트는 read로 했을때 잘되서 통신도 아닌것 같구 다른설정문제일까요 ㅠ

0

안녕하세요 Oh suhyeon님,

Docker Compose를 다시 시작하는 것은 의미가 없습니다. 왜냐하면 제가 데이터를 보존하기 위해서 Docker Compose Kafka에 볼륨을 설정해 놓았기 때문에 그 디렉토리를 지우셔야 Kafka의 데이터가 지워집니다.. kafka-persistence 디렉토리안에 있는 모든 파일을 지우고 다시 시작하시면 됩니다.

https://github.com/dimz119/learn-pyspark/blob/main/docker-compose.yml#L56

Docker Compose는 어플리케이션들을 다시 시작하는 거지 데이타까지 초기화하는 것이 아닙니다.

마지막으로 제가 설정한 Docker Compose를 그대로 쓰실거라면 호스트와 포트 번호는 바꾸시면 안됩니다. 이는 Docker Compose에 아래 링크처럼 설정되어 있기 때문입니다.

호스트: https://github.com/dimz119/learn-pyspark/blob/main/docker-compose.yml#L53

포트번호: https://github.com/dimz119/learn-pyspark/blob/main/docker-compose.yml#L61

그래도 카프카 포트를 확인 하시고 싶으시다면 Spark 다커로 SSH 하신 후, netcat이 설치 후 nc -vz kafka 9092를 하시면 포트가 오픈되어 있는지 확인 가능합니다.

0

Oh Suhyeon님의 프로필

Oh Suhyeon

질문자

2023.11.19

그런데 이상하네요 read는 잘되는데 readStream만 안되서 ㅠ

아래처럼 kafka 포트 번호 나 호스트 다른걸로 넣어도 동일한 에러가 발생해서 kafka 토픽을 실제로 안보고 있는것 같아서요

events = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092111")
    .option("subscribe", "quickstart")
    # .option("startingOffsets", "earliest")
    .load()
)

0

Oh Suhyeon님의 프로필

Oh Suhyeon

질문자

2023.11.19

빠륻답변감사합니다 도커컴포즈 자체를 재시작했는데도 동일합니다. 키프카에 topic을 생성하지 않고 실행햇을때 동일한 에러가 나는것으로보아 spark에서 카프카를 제대로 바라보고 있는지 의문이 드는데 확인할수있는방법 없을까요 ㅠ