강의

멘토링

커뮤니티

Cộng đồng Hỏi & Đáp của Inflearn

Hình ảnh hồ sơ của mjso
mjso

câu hỏi đã được viết

Ứng dụng kiến trúc microservice (MSA) phát triển với Spring Cloud

Microservice Orders được sửa đổi - Order Kafka Producer

kafka sink 생성 시 tasks state failed

Viết

·

1.1K

0

안녕하세요 강사님!

강의 잘보고 있습니다.

강의시간 18:58에 sink 생성 후 status 확인 값에서 tasks.state 값이 failed라고 뜹니다.

그래서 connect 로그를 보면 아래와 같이 뜹니다.

[2023-05-02 00:04:12,636] ERROR [my-order-sink-connect|task-0] WorkerSinkTask{id=my-order-sink-connect-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:196)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
        ... 13 more

 

구글링 해봐도 원하는 답변들이 없어 남겨봅니다.

spring-bootjpa아키텍처spring-cloudkafkamsa

Câu trả lời 2

0

mj Song님의 프로필 이미지
mj Song
Người đặt câu hỏi

자답합니다...

이유는 모르겠으나 등록했던 connector, topic 전부 날리고 새로 시작했더니 되네요...

허허...

0

mj Song님의 프로필 이미지
mj Song
Người đặt câu hỏi

추가적으로 spring 단에서는 별다른 에러는 없습니다.

kafka-console-consumer로 orders topic 로그를 보면 정상적으로 나옵니다.

다만 db에 데이터가 적재가 안되는데 connector에 tasks가 작동 중이 아니라 그런 것 같습니다.

Hình ảnh hồ sơ của mjso
mjso

câu hỏi đã được viết

Đặt câu hỏi