• 카테고리

    질문 & 답변
  • 세부 분야

    백엔드

  • 해결 여부

    미해결

sink connect 생성시 fail 발생문제

22.03.06 11:27 작성 조회수 1.33k

0

WorkerSinkTask{id=my-sink-connect3-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration. at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:370) at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:495) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 13 more [2022-03-06 11:22:24,134] ERROR WorkerSinkTask{id=my-sink-connect3-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
 
 
 
 
sink connect 생성시 위와 같은오류가 발생합니다.
구글링 하여 connect-standalone 파일의 설정을 false로 하여도 동일하게 오류 발생하고 해결을 못하고 있습니다.
 
key.converter.schemas.enable=false
value.converter.schemas.enable=false

답변 1

답변을 작성해보세요.

0

안녕하세요, 이도원입니다. 

sink connect의 역할은 Kafka Topic에 저장된 메시지를 지정된 Target(DB, S3, 기타 storage ...)으로 보내주는 역할을 합니다. 이때, DB로 메시지를 전달하기 위해서는 전달하는 메시지가 저장될 DB의 Schema에 맞는 형식이어야 합니다. 올려주신 에러 메시지로 유추해 보면, schema 부분과 payload 부분이 잘못되지 않았나 싶습니다. 아래의 json 포맷으로 메시지가 생성되는지 확인해 보고 테스트해 보시기 바랍니다. 

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"pwd"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"createAt"}],"optional":false,"name":"users"},"payload":{"id":4,"user_id":"user54","name":"User4","pwd":"test5555","createAt":1613877255000}}

 

감사합니다. 

윤원석님의 프로필

윤원석

2023.11.20

Kafka 에서 Sink 대상 Topic 를 삭제하고, 다시 실행 해 보세요.

초기화 작업이 필요합니다.

다시 Source 등록하고, Sink 연동 을 시나리오에 의해서

처음 부터 다시 해야 합니다.