Sink connect 등록시 에러가 납니다.
안녕하세요. 선생님
Confluent AVRO Install 하고 나서 sink connect 등록하고 상태를 확인 해보니 아래와 같이 에러가 발생 했습니다.
ubuntu@ip-172-31-0-29:~$ curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "mysql-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS",
"connection.url": "jdbc:mysql://lg-mysql:3306/aurora2",
"connection.user": "sink",
"connection.password": "sink1234",
"table.name.format": "aurora2.TB_ME_TERMS_BAS",
"insert.mode": "upsert",
"pk.fields": "TERMS_VER_ID",
"pk.mode": "record_key",
"delete.enabled": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://lg-schema01:8081",
"value.converter.schema.registry.url": "http://lg-schema01:8081"
}
}'
{"name":"mysql-sink-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS","connection.url":"jdbc:mysql://lg-mysql:3306/aurora2","connection.user":"sink","connection.password":"sink1234","table.name.format":"aurora2.TB_ME_TERMS_BAS","insert.mode":"upsert","pk.fields":"TERMS_VER_ID","pk.mode":"record_key","delete.enabled":"true","key.converter":"io.confluent.connect.avro.AvroConverter","value.converter":"io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url":"http://lg-schema01:8081","value.converter.schema.registry.url":"http://lg-schema01:8081","name":"mysql-sink-connector"},"tasks":[],"type":"sink"}
ubuntu@ip-172-31-0-29:~$ curl -X GET http://localhost:8083/connectors/mysql-sink-connector/status | jq '.'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1898 100 1898 0 0 31616 0 --:--:-- --:--:-- --:--:-- 32169
{
"name": "mysql-sink-connector",
"connector": {
"state": "RUNNING",
"worker_id": "172.31.13.238:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "172.31.13.238:8083",
"trace": "java.lang.NoClassDefFoundError: com/google/common/base/Ticker\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:175)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:158)\n\tat io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory.newClient(SchemaRegistryClientFactory.java:36)\n\tat io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:68)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:297)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:640)\n\tat org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:544)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1703)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1753)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.ClassNotFoundException: com.google.common.base.Ticker\n\tat java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:103)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)\n\t... 13 more\n"
}
],
"type": "sink"
}
참고로 스키마 레지스토리 서버가 잘 붙나 확인 해본 결과 잘 붙습니다.
ubuntu@ip-172-31-0-29:~$ curl -v lg-schema01:8081
* Trying 172.31.46.33:8081...
* Connected to lg-schema01 (172.31.46.33) port 8081 (#0)
> GET / HTTP/1.1
> Host: lg-schema01:8081
> User-Agent: curl/7.81.0
> Accept: /
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Date: Mon, 13 Mar 2023 06:14:56 GMT
< Content-Type: application/vnd.schemaregistry.v1+json
< Vary: Accept-Encoding, User-Agent
< Content-Length: 2
<
* Connection #0 to host lg-schema01 left intact
{}ubuntu@ip-172-31-0-29:~$
현재 제가 구성한 환경 정보 입니다.
Apache Zookeeper 3.8.1 3개
Apache Kafka 2.13-3.3.2 3개
=> Source connect : Debezium mysql connect 2.1.2
=> Sink connect : Confluent JDBC Connector 10.6.3
=> Avro : Confluent Avro converter 7.3
Confluent Schema registry 7.3.0 1개
mysql DB 1 개 (Source 와 Sink 는 user 로 분리해서 데이터 넣는 방식으로 테스트 중입니다)
구글링 검색 결과 Guava 를 못찾아서 그런거 같다는데, 제가 설치한 환경의 디펜던시가 문제일까요?
답변 1
0
선생님, 위 에러 원인 찾다가
Kafka plug-in 설치된 곳을 가보니
/usr/local/kafka/connector-plugin/confluentinc-kafka-connect-avro-converter/lib/
경로안에 "guava-30.1.1-jre.jar" 이 파일이 없어서 제가 추가 했습니다.
스키마에는 잘 등록 되어서 아래와 같이 나왔습니다.
ubuntu@ip-172-31-46-33:~$ http http://localhost:8081/schemas
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Length: 195
Content-Type: application/vnd.schemaregistry.v1+json
Date: Mon, 13 Mar 2023 06:50:47 GMT
Vary: Accept-Encoding, User-Agent
[
{
"id": 2,
"schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST\",\"fields\":[{\"name\":\"TERMS_VER_ID\",\"type\":\"string\"}],\"connect.name\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST.Key\"}",
"subject": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST-key",
"version": 1
}
]
source 쪽에 등록은 정상적으로 되었는데 sink 쪽 등록시 아래와 같이 에러가 납니다.
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "mysql-sink-connector-test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST",
"connection.url": "jdbc:mysql://lg-mysql:3306/aurora2",
"connection.user": "sink",
"connection.password": "sink1234",
"table.name.format": "aurora2.TB_ME_TERMS_BAS_TEST",
"insert.mode": "upsert",
"pk.fields": "TERMS_VER_ID",
"pk.mode": "record_key",
"delete.enabled": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://lg-schema01:8081",
"value.converter.schema.registry.url": "http://lg-schema01:8081"
}
}'
ubuntu@ip-172-31-0-29:/data/kafka-logs$ curl -X GET http://localhost:8083/connectors/mysql-sink-connector-test/status | jq '.'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 2913 100 2913 0 0 176k 0 --:--:-- --:--:-- --:--:-- 284k
{
"name": "mysql-sink-connector-test",
"connector": {
"state": "RUNNING",
"worker_id": "172.31.10.77:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "172.31.10.77:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST to Avro: \n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:244)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:334)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:202)\n\tat io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)\n\t... 17 more\n"
}
],
"type": "sink"
}
질문1. 제가 Guava file 을 kafka connect avro converter 설치된 lib 에 추가 하는게 맞나요?
질문2. 바로 위에 에러 난건 Sink connect 등록시 무었을 누락 시켜서 일까요?
0
먼저 이전에 질문 올리신 Source Connector는 이제 잘 동작하나요?
일단 그런걸로 가정하고,
뭔가 avro converter에 계속 문제가 있는 것 같군요. confluent hub 에서 다운로드 받은 io.confluent.connect.avro.AvroConverter 가 Guava 까지 필요한가요?
저도 apache kafka 에서 수동으로 Avro Converter를 이용해 본지가 넘 오래되서 기억이 가물가물한데 Guava 까지 필요했던건 아닌것 같습니다만...
오류만 보면 sink connector에서 avro 메시지를 deserialization하는데 문제가 발생하는 것 같습니다.
일단 Source Connector를 통해 topic에 메시지가 제대로 avro형태로 입력되었는지 먼저 확인해 보시지요. 강의를 참조하셔서 kafka-avro-console-consumer 명령어를 참조하셔서 해당 토픽에 메시지가 제대로 저장되었는지 확인해 보시고, 안되면 다시 글 부탁드립니다.
0
안녕하세요. 선생님
제가 수업에서 들은 카프카캣으로 내부 스키마 내용을 확인 해봤는데 AVRO로 변환이 하나도 안된거 같습니다.
ubuntu@ip-172-31-0-29:~$ kafkacat -b lg-kafka01:9092 -C -t source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS -J -u -q |jq '.'
{
"topic": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS",
"partition": 0,
"offset": 640,
"tstype": "create",
"ts": 1678689074221,
"broker": -1,
"key": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_VER_ID\"}],\"optional\":false,\"name\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS.Key\"},\"payload\":{\"TERMS_VER_ID\":\"1\"}}",
"payload": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_VER_ID\"},{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_TP_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AGR_NOTI_FLAG\"},{\"type\":\"string\",\"optional\":true,\"field\":\"PROD_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"PLFM_CODE\"},{\"type\":\"string\",\"optional\":false,\"field\":\"CNTRY_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"TERMS_EXPLN\"},{\"type\":\"string\",\"optional\":true,\"field\":\"APPLY_STRT_YMD\"},{\"type\":\"string\",\"optional\":true,\"field\":\"STAT_CODE\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"GEN_USR_NO\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"CRT_DATE\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"LAST_CHG_USR_NO\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"LAST_CHG_DATE\"},{\"type\":\"string\",\"optional\":false,\"field\":\"USE_FLAG\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"APPRVR_NO\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"APPRV_DATE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"APPRV_OPNN\"},{\"type\":\"string\",\"optional\":true,\"field\":\"APPRV_STAT_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"ENTP_WIDE_MBRSHP_FLAG\"},{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_MGT_TP_CODE\"}],\"optional\":false,\"name\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS.Value\"},\"payload\":{\"TERMS_VER_ID\":\"1\",\"TERMS_TP_CODE\":\"6\",\"AGR_NOTI_FLAG\":null,\"PROD_CODE\":\"3\",\"PLFM_CODE\":\"4\",\"CNTRY_CODE\":\"6\",\"TERMS_EXPLN\":\"7\",\"APPLY_STRT_YMD\":\"\",\"STAT_CODE\":null,\"GEN_USR_NO\":null,\"CRT_DATE\":null,\"LAST_CHG_USR_NO\":null,\"LAST_CHG_DATE\":null,\"USE_FLAG\":\"4\",\"APPRVR_NO\":null,\"APPRV_DATE\":null,\"APPRV_OPNN\":null,\"APPRV_STAT_CODE\":null,\"ENTP_WIDE_MBRSHP_FLAG\":null,\"TERMS_MGT_TP_CODE\":\"4\"}}"
}
원래는 위에 내용이 바이너리 형태로 되어있어야 했던거 같은데 순수 json 으로 보여집니다.
아마 제가 AVRO 설정을 잘못 한 느낌이 듭니다.
스키마에서 AVRO 로 안되어 있다보니, Source 쪽에서 바로 에러 나는거 같습니다.
왠지 처음부터 다시 해야 하는 느낌이 드네요.....
0
음, Source Connector에서 key와 value converter를 avro로 설정했는데, topic 메시지가 avro가 안될리가 없습니다만..
뭔가 지금 환경 구성이 잘못된건 아닌지요? Source Connector json 파일이 제대로 작성되고, 해당 json 파일로 connector가 잘 생성이 되었는지 확인이 필요할 것 같습니다.
혹시 지금 읽어들인 topic 메시지가 avro 메시지를 보내기 전에 json 메시지로 미리 만들어진 topic 메시지는 아닌지요?
뭔가 지금 작업 하시면서 살짝 꼬인 부분이 있는 것 같습니다. 기존 config 설정등의 작업 파일을 back 디렉토리에 옮겨놓으시고 기존 작업 파일들을 삭제하신 뒤에 차분히 하나씩 내용을 검토하면서 다시 작업을 해보시면 어떨까 싶습니다
계속 csv파일을 폴더로 인식하는 중입니다.
0
69
2
cdc failover
0
70
2
avro plugin에 안보이는거 같아요
0
70
2
zookeeper가 실행이 안되요
0
80
3
Debezium CDC Source Connector 이벤트 메시지
0
87
2
재 질문! 다른 connector가 동일 토픽 사용
0
68
2
메시지 발행이 안 됩니다.
0
78
2
디비지움 오프셋 리셋
0
96
2
mysql_jdbc_oc_sink_customers_02.json 에서 오류납니다.
0
72
3
io.debezium.connector.mysql.MySqlConnector 질문
0
68
2
Debezium 이벤트 메시지 발행 시 성공여부
0
82
2
connect 구동 오류
0
99
3
CDC Connect 통시에서의 암호화 관련 하여 문의 드립니다.
0
122
2
GCSSinkconnector
0
66
1
debezium source connector에서 `poll.interval.ms` 파라미터
0
145
2
자문자답: JDBC Sink Connector가 PostgreSQL의 Schema를 바라보지 않습니다.
0
214
1
table.name.format에 관하여..
0
197
2
binlog와 offset 관련 추가 질문드립니다
0
152
1
Debezium cdc source for MSSQL-Server [등록오류]
0
193
2
http http://localhost:8083/connector-plugins 시 플러그인이 보이지 않습니다.
0
192
2
안녕하세요 sink connector 생성 config 관련 질문입니다.
0
165
2
수강환경 VirtualBox -> Docker
0
232
2
일반적인 예시들이 궁금해서 질문 남깁니다.
1
149
2
debezium connector schema 찾지 못하는 오류
0
437
2





