inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)

Orders Microservice와 Catalogs Microservice에 Kafka Topic의 적용

카프카 sink connector 사용시 에러

636

뉴비

작성한 질문수 2

0

안녕하세요. 카프카 관련 수업 듣는 중 오류가 발생하여 질문 드립니다.

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:366)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): was expecting comma to separate Object entries\n at [Source: (byte[])\"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"user_id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"pwd\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"created_at\"}],\"optional\":false,\"name\":\"users\"},\"payload\":{\"id”:4,”user_id\":\"user4”,”pwd\":\"1234\",\"name\":\"username4”,”created_at\":1671277849000}}\"; line: 1, column: 433]\nCaused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): was expecting comma to separate Object entries\n at [Source: (byte[])\"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"user_id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"pwd\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"created_at\"}],\"optional\":false,\"name\":\"users\"},\"payload\":{\"id”:4,”user_id\":\"user4”,”pwd\":\"1234\",\"name\":\"username4”,”created_at\":1671277849000}}\"; line: 1, column: 433]\n\tat com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)\n\tat com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)\n\tat com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:637)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1011)\n\tat com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:250)\n\tat com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:258)\n\tat com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68)\n\tat com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)\n\tat com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4270)\n\tat com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734)\n\tat org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:364)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n

발생 에러는 다음과 같은데,

터미널에서 직접 produce 하는 도중 발생하였는데 payload 의 데이터가 잘못 전송 되어서 해당 오류가 발생 한 것 같아서 기존 users 테이블에 다시 insert 하는 방식으로 다시 사용했을 때 이전에 실패한 task 가 남아있어서 여전히 my_topic_users 테이블에 insert 되지 않았습니다.

중간에 발생한 task 는 삭제하거나 임의로 건너 뛰거나 할 수 없는 것인가요?

kafka JPA spring-boot architecture spring-cloud Kafka msa

답변 0

kafka 업데이트 강의 듣고 시포요

0

91

2

강의 교안

0

80

2

마이크로서비스간 통신 시, 인증 처리

0

87

2

api gateway 에서 인증 처리

0

67

1

섹션 19 질문드립니다

0

60

2

강의 자료 업데이트

0

87

2

부하분산 강의 섹션

0

59

1

강의자료는 어디에서?

0

78

2

강의 자료는 어디서 다운 받을 수 있나요?

0

114

2

전체 사용자 조회시 오류

0

60

1

혹시 pk 외 별도의 id 를 부여한 이유가 있을까요 ??

0

113

2

학습 방향

0

96

2

카프카 커넥터 사용 목적 문의

0

87

2

kafka 강의

0

109

2

서비스 디스커버리 종류

0

87

2

강의 자료에 대해서 궁금해요

0

119

2

GlobalFilter, LoggingFilter가 동작하지 않습니다.

0

91

2

Kafka Source Connect 버전 에러

0

90

2

소스커넥터는 사용안한 거 맞죠?

0

82

2

강의자료 업데이트 문의

0

97

2

강의에서 BCryptPasswordEncoder 에 역할(5-2)

0

59

1

강의 업데이트 계획이 궁금합니다.

0

115

2

MSA 애플리케이션에 Spring Web과 Spring Data JPA를 사용하는 것이 바람직한지 궁금합니다. (MSA 설계와 관련된 질문입니다)

0

163

2

어떤 것이 업데이트 된 건가요?

0

167

2