inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)

Kafka Sink Connect 사용

sink connect 생성시 fail 발생문제

1813

권윤경

작성한 질문수 4

0

WorkerSinkTask{id=my-sink-connect3-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration. at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:370) at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:495) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 13 more [2022-03-06 11:22:24,134] ERROR WorkerSinkTask{id=my-sink-connect3-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
 
 
 
 
sink connect 생성시 위와 같은오류가 발생합니다.
구글링 하여 connect-standalone 파일의 설정을 false로 하여도 동일하게 오류 발생하고 해결을 못하고 있습니다.
 
key.converter.schemas.enable=false
value.converter.schemas.enable=false

JPA spring-boot architecture spring-cloud msa Kafka

답변 1

0

Dowon Lee

안녕하세요, 이도원입니다. 

sink connect의 역할은 Kafka Topic에 저장된 메시지를 지정된 Target(DB, S3, 기타 storage ...)으로 보내주는 역할을 합니다. 이때, DB로 메시지를 전달하기 위해서는 전달하는 메시지가 저장될 DB의 Schema에 맞는 형식이어야 합니다. 올려주신 에러 메시지로 유추해 보면, schema 부분과 payload 부분이 잘못되지 않았나 싶습니다. 아래의 json 포맷으로 메시지가 생성되는지 확인해 보고 테스트해 보시기 바랍니다. 

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"pwd"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"createAt"}],"optional":false,"name":"users"},"payload":{"id":4,"user_id":"user54","name":"User4","pwd":"test5555","createAt":1613877255000}}

 

감사합니다. 

1

윤원석

Kafka 에서 Sink 대상 Topic 를 삭제하고, 다시 실행 해 보세요.

초기화 작업이 필요합니다.

다시 Source 등록하고, Sink 연동 을 시나리오에 의해서

처음 부터 다시 해야 합니다.

 

kafka 업데이트 강의 듣고 시포요

0

85

1

강의 교안

0

75

1

마이크로서비스간 통신 시, 인증 처리

0

79

1

api gateway 에서 인증 처리

0

66

1

섹션 19 질문드립니다

0

54

1

강의 자료 업데이트

0

84

1

부하분산 강의 섹션

0

57

1

강의자료는 어디에서?

0

72

1

강의 자료는 어디서 다운 받을 수 있나요?

0

110

1

전체 사용자 조회시 오류

0

59

1

혹시 pk 외 별도의 id 를 부여한 이유가 있을까요 ??

0

113

2

학습 방향

0

96

2

카프카 커넥터 사용 목적 문의

0

86

2

kafka 강의

0

109

2

서비스 디스커버리 종류

0

87

2

강의 자료에 대해서 궁금해요

0

117

2

GlobalFilter, LoggingFilter가 동작하지 않습니다.

0

90

2

Kafka Source Connect 버전 에러

0

85

2

소스커넥터는 사용안한 거 맞죠?

0

81

2

강의자료 업데이트 문의

0

96

2

강의에서 BCryptPasswordEncoder 에 역할(5-2)

0

57

1

강의 업데이트 계획이 궁금합니다.

0

113

2

MSA 애플리케이션에 Spring Web과 Spring Data JPA를 사용하는 것이 바람직한지 궁금합니다. (MSA 설계와 관련된 질문입니다)

0

163

2

어떤 것이 업데이트 된 건가요?

0

164

2