커스텀 elasticsearch sink connector를 만들었는데 class를 찾지 못하고 있어요
452
작성한 질문수 1
{
"error_code": 500,
"message": "Error trying to forward REST request: Failed to find any class that implements Connector and which name matches com.example.connector.kafka.ElasticsearchSinkConnector, available connectors are:
PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'},
PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'},
PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'},
PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'},
PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'},
PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='11.0.0', encodedVersion=11.0.0, type=sink, typeName='sink', location='file:/data01/connectors/elasticsearch_sink/'},
PluginDesc{klass=class io.debezium.connector.sqlserver.SqlServerConnector, name='io.debezium.connector.sqlserver.SqlServerConnector', version='1.7.1.Final', encodedVersion=1.7.1.Final, type=source, typeName='source', location='file:/data01/connectors/mssql_source/'},
PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.8.2', encodedVersion=2.8.2, type=sink, typeName='sink', location='classpath'},
PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'},
PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'},
PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'},
PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'},
PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.8.2', encodedVersion=2.8.2, type=connector, typeName='connector', location='classpath'},
PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.8.2', encodedVersion=2.8.2, type=sink, typeName='sink', location='classpath'},
PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'},
PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'},
PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'},
PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'}"
}connector 등록 api를 날리면 이런 에러가 뜹니다.
그런데 /connector-plugins request를 날리면 plugin으로 인식하고 있어요.
SinkConnector를 상속한 클래스의 path도 맞는데 왜 동작하지 않는 건지 모르겠어요
[
{
"class": "com.example.connector.kafka.ElasticsearchSinkConnector",
"type": "sink",
"version": "1.0"
},
{
"class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"type": "source",
"version": "0.0.0.0"
},
{
"class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector",
"type": "source",
"version": "0.0.0.0"
},
{
"class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector",
"type": "source",
"version": "0.0.0.0"
},
{
"class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector",
"type": "source",
"version": "0.0.0.0"
},
{
"class": "com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector",
"type": "source",
"version": "0.0.0.0"
},
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "11.0.0"
},
{
"class": "io.debezium.connector.sqlserver.SqlServerConnector",
"type": "source",
"version": "1.8.1.Final"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.8.2"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.8.2"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
로그를 봐도 커넥트 로그만 나오고 등록한 커넥터의 로그는 나오지 않는 것 같은데 혹시 카프카와 연결해서 커넥터를 ide에서 디버깅할 수 있는 방법이 있을까요?
답변 2
0
안녕하세요!
커스텀 커넥터 jar가 정상적으로 plugin.path를 통해 등록되었음에도 불구하고 로드되지 못하는 이유는 다양할 수 있습니다. 올려주신 로그 메시지 만으로는 정확한 이유를 찾기는 어려운데요.
패키지 경로가 정확한지 확인
ElasticsearchSinkConnector가 SinkConnector로 정상적으로 상속되었는지 확인
uberJar(혹은 fatJar)로 만들 때 JDK 1.8 버전으로 컴파일 했는지 확인
위 3가지 방법을 시도해보시면 좋을것 같습니다.
0
안녕하세요 이하정 님, 인프런 AI 인턴이에요.
커넥터 등록 과정에서 클래스를 찾지 못하는 오류가 발생하고 있습니다. 이 경우에는 몇 가지 원인이 있을 수 있습니다.
커넥터 클래스가 올바른 패키지에 있는지 확인해주세요. 클래스의 경로와 패키지명이 일치하는지 확인해야 합니다.
커넥터 JAR 파일이 올바른 위치에 있는지 확인해주세요. 컨넥터 JAR 파일은 Kafka Connect 런타임의
plugin.path에 정의된 디렉토리에 위치해야 합니다.커넥터 클래스 내에서 필요한 종속성이 올바르게 제공되는지 확인해주세요. 커넥터 클래스가 의존하는 라이브러리나 파일들이 올바르게 제공되지 않으면 클래스를 찾지 못하는 오류가 발생할 수 있습니다.
IDE에서 커넥터를 디버깅하려면 Kafka Connect 런타임을 직접 실행하고 해당 런타임에 커넥터를 등록해야 합니다. 특정 IDE(예: IntelliJ)에서는 디버깅 환경을 구성하여 Kafka Connect를 실행하고 디버깅 세션을 시작할 수도 있습니다. 이를 통해 커넥터 클래스 내에서 발생하는 문제를 더 쉽게 찾고 해결할 수 있습니다.
위의 과정들을 확인하고 문제가 계속되면 더 자세한 로그나 오류 메시지를 확인하여 문제 해결을 시도해보세요. 도움이 되셨길 바랍니다!
consume 이후 lag가 줄어들지 않음
0
74
2
안녕하세요. 강의의 카프카 버전과 현재 시점의 카프카의 차이점 문의 드립니다.
0
112
2
멱등성 프로듀서 retries 관련 질문입니다.
0
97
2
채팅 서비스 개발 시 주의점이 있을까요?
0
98
2
충분히 큰 파티션 생성시 궁금증이 존재합니다.
0
86
2
KTable 키가 없는 레코드 처리
0
72
2
컨슈머 테스트 코드 작성
0
94
2
리밸런스 onPartitionRevoked이 필요한 상황
0
74
2
카프카 클러스터에서 감당 가능한 파티션(레플리카) 수 문의
0
119
2
reset offset 질문
0
77
2
KStreamJoinKTable 실행시 오류
0
78
2
auto.commit.interval.ms 옵션 관련 질문 드립니다.
0
141
2
파티션, 컨슈머 그룹, 컨슈머 관련 질문
0
117
1
java, kotlin
0
130
2
shutdownThread 에 대한 문의 입니다.
0
153
2
zookeeper실행시 오류가 발생합니다.
0
268
2
커스텀 소스 커넥터에서 Thread.sleep (1000) 은 왜 하는거에요?
0
152
2
처리량을 늘리기 위해서 파티션을 늘리고 컨슈머를 늘려야한다고 설명하셨는데요
0
180
3
파티션 개수와 컨슈머 개수의 처리량 관련 질문
0
163
2
동영상 및 이미지 처리 관련 문의 드립니다.
0
225
2
주키퍼 없이 사용 문의 드립니다.
0
302
1
kafka 를 띄우니 오류가 발생하고 종료합니다.
0
280
3
zookeeper 실행시 오류 .. 무엇을 해야 할까요?
0
220
2
파티션 추가로 해결할 수 있지만 늘어난 파티션은 줄일 수 없지 않나요?
0
192
2





