inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

strato721224님의 게시글

strato721224 strato721224

@strato7212244840

수강평 작성수
-
평균평점
-

게시글 1

질문&답변

Sink connect 등록시 에러가 납니다.

선생님, 위 에러 원인 찾다가 Kafka plug-in 설치된 곳을 가보니 /usr/local/kafka/connector-plugin/confluentinc-kafka-connect-avro-converter/lib/ 경로안에 "guava-30.1.1-jre.jar" 이 파일이 없어서 제가 추가 했습니다. 스키마에는 잘 등록 되어서 아래와 같이 나왔습니다. ubuntu@ip-172-31-46-33:~$ http http://localhost:8081/schemas HTTP/1.1 200 OK Content-Encoding: gzip Content-Length: 195 Content-Type: application/vnd.schemaregistry.v1+json Date: Mon, 13 Mar 2023 06:50:47 GMT Vary: Accept-Encoding, User-Agent [ { "id": 2, "schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST\",\"fields\":[{\"name\":\"TERMS_VER_ID\",\"type\":\"string\"}],\" connect.name \":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST.Key\"}", "subject": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST-key", "version": 1 } ] source 쪽에 등록은 정상적으로 되었는데 sink 쪽 등록시 아래와 같이 에러가 납니다. curl --location --request POST ' http://localhost:8083/connectors ' \ --header 'Content-Type: application/json' \ --data-raw '{ "name": "mysql-sink-connector-test", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST", "connection.url": "jdbc:mysql://lg-mysql:3306/aurora2", "connection.user": "sink", "connection.password": "sink1234", " table.name .format": "aurora2.TB_ME_TERMS_BAS_TEST", "insert.mode": "upsert", "pk.fields": "TERMS_VER_ID", "pk.mode": "record_key", "delete.enabled": "true", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": " http://lg-schema01:8081 ", "value.converter.schema.registry.url": " http://lg-schema01:8081 " } }' ubuntu@ip-172-31-0-29:/data/kafka-logs$ curl -X GET http://localhost:8083/connectors/mysql-sink-connector-test/status | jq '.' % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 2913 100 2913 0 0 176k 0 --:--:-- --:--:-- --:--:-- 284k { "name": "mysql-sink-connector-test", "connector": { "state": "RUNNING", "worker_id": "172.31.10.77:8083" }, "tasks": [ { "id": 0, "state": "FAILED", "worker_id": "172.31.10.77:8083", "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError( RetryWithToleranceOperator.java:223 )\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute( RetryWithToleranceOperator.java:149 )\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord( WorkerSinkTask.java:513 )\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages( WorkerSinkTask.java:493 )\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll( WorkerSinkTask.java:332 )\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration( WorkerSinkTask.java:234 )\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute( WorkerSinkTask.java:203 )\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun( WorkerTask.java:189 )\n\tat org.apache.kafka.connect.runtime.WorkerTask.run ( WorkerTask.java:244 )\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call( Executors.java:515 )\n\tat java.base/java.util.concurrent.FutureTask.run( FutureTask.java:264 )\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1128 )\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:628 )\n\tat java.base/java.lang.Thread.run( Thread.java:829 )\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST to Avro: \n\tat io.confluent.connect.avro.AvroConverter.toConnectData( AvroConverter.java:124 )\n\tat org.apache.kafka.connect.storage .Converter.toConnectData( Converter.java:88 )\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3( WorkerSinkTask.java:513 )\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry( RetryWithToleranceOperator.java:173 )\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError( RetryWithToleranceOperator.java:207 )\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer( AbstractKafkaSchemaSerDe.java:244 )\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext. ( AbstractKafkaAvroDeserializer.java:334 )\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion( AbstractKafkaAvroDeserializer.java:202 )\n\tat io.confluent.connect.avro.AvroConverter$Deserializer.deserialize( AvroConverter.java:172 )\n\tat io.confluent.connect.avro.AvroConverter.toConnectData( AvroConverter.java:107 )\n\t... 17 more\n" } ], "type": "sink" } 질문1. 제가 Guava file 을 kafka connect avro converter 설치된 lib 에 추가 하는게 맞나요? 질문2. 바로 위에 에러 난건 Sink connect 등록시 무었을 누락 시켜서 일까요?

좋아요수
1
댓글수
1
조회수
1340