커넥트 에러 관련
930
작성한 질문수 4
안녕하세요. 강사님
좋은 강의 잘 듣고 있습니다. 카프카 관련해서 아래와 같은
에러가 발생하여 질문 드립니다.
에러 상황은 sql insert문으로 데이터가 잘 들어가졌는데,
producer를 이용하여 json구조의 데이터를 넣은 후 부터 정
상적으로 동작하지 않고, sink의 status를 보면 아래와 같은 에
러가 보입니다.
원인과 해결방법을 알려주시면 감사하겠습니다.
에러 -

json구조 데이터 -
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"pwd"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}],"optional":false,"name":"users"},"payload":{"id":9,"user_id":"admin12","pwd":"1111234","name":"admin","created_at":1628604236000}}
답변 3
2
안녕하세요. 강사님
관련하여 위 에러는 producer에서
위 형식이 아닌 다른 형식으로 메시지를 보내게 되면
db insert를 하지 못하는 에러가 발생하고,
그 후에 정상적인 메시지형태(db insert할 수 있는)로 보내더라도 기존에 잘못 보냈던 메시지가 처리되지 않아 발생하는 에러 인 것으로 보입니다.
sink, source를 지우고 producer에서 정상적인 형태로만 메시지를 보내보니 디비에 정상 저장되는 것을 확인하였는데,
그렇다면 위 처럼 잘못된 메시지가 온 경우 이를 무시하거나 삭제하고 다음 메시지를 이어서 처리하는 방법이 있을까요?
0
안녕하세요. 강사님
파일 첨부가 안돼서 아래에 텍스트로 첨부하였습니다.
불편하시겠지만 확인 부탁드리겠습니다.
감사합니다.
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\r\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\r\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\r\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\r\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\r\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\r\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\r\n\tat java.lang.Thread.run(Thread.java:748)\r\nCaused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\r\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:370)\r\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:495)\r\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\r\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\r\n\t... 13 more\r\n
0
안녕하세요, 이도원입니다.
query를 이용해서 처리했을 때는 sink connect도 잘 작동되지만, Producer를 이용하였을 때는 오류가 발생한다는 문제는 Producer에서 전달하는 message가 DB에 저장될 수 있는 형식이 아니어서 그런것 같습니다. 위에 올려주신 에러 메시지는 확인하기에 너무 작네요.(ㅡ.ㅡ) 전체 에러 메시지를 텍스트 형식이어도 좋으니 다시 한번 첨부해 주시면 확인하는데에 도움이 될 것 같습니다.
감사합니다.
kafka 업데이트 강의 듣고 시포요
0
114
2
강의 교안
0
105
2
마이크로서비스간 통신 시, 인증 처리
0
116
2
api gateway 에서 인증 처리
0
76
1
섹션 19 질문드립니다
0
84
2
강의 자료 업데이트
0
105
2
부하분산 강의 섹션
0
66
1
강의자료는 어디에서?
0
98
2
강의 자료는 어디서 다운 받을 수 있나요?
0
131
2
전체 사용자 조회시 오류
0
65
1
혹시 pk 외 별도의 id 를 부여한 이유가 있을까요 ??
0
120
2
학습 방향
0
105
2
카프카 커넥터 사용 목적 문의
0
91
2
kafka 강의
0
118
2
서비스 디스커버리 종류
0
90
2
강의 자료에 대해서 궁금해요
0
125
2
GlobalFilter, LoggingFilter가 동작하지 않습니다.
0
96
2
Kafka Source Connect 버전 에러
0
95
2
소스커넥터는 사용안한 거 맞죠?
0
84
2
강의자료 업데이트 문의
0
99
2
강의에서 BCryptPasswordEncoder 에 역할(5-2)
0
63
1
강의 업데이트 계획이 궁금합니다.
0
120
2
MSA 애플리케이션에 Spring Web과 Spring Data JPA를 사용하는 것이 바람직한지 궁금합니다. (MSA 설계와 관련된 질문입니다)
0
168
2
어떤 것이 업데이트 된 건가요?
0
169
2





