• 카테고리

    질문 & 답변
  • 세부 분야

    백엔드

  • 해결 여부

    해결됨

consumer가 작동하지 않습니다

23.12.19 13:25 작성 23.12.19 14:21 수정 조회수 295

0

consumer를 실행시키면

WARN [Consumer clientId=console-consumer, groupId=console-consumer-83590] Error while fetching metadata with correlation id 2 : {my_topic_users=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

이게 뜨고 이후에 데이터베이스에 데이터를 넣어도 아무것도 뜨지 않습니다.

 

http://localhost:8083/connectors/my-source-connect/status 실행시키면 이렇게 나옵니다.

{
    "name": "my-source-connect",
    "connector": {
        "state": "FAILED",
        "worker_id": "",
        "trace": "org.apache.kafka.connect.errors.ConnectException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=localhost)(port=3307)(type=master) : GSS-API authentication exception\r\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:59)\r\n\tat io.confluent.connect.jdbc.JdbcSourceConnector.start(JdbcSourceConnector.java:94)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:193)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:218)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:363)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:346)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:146)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:123)\r\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\r\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\r\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\r\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\r\n\tat java.base/java.lang.Thread.run(Thread.java:842)\r\nCaused by: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=localhost)(port=3307)(type=master) : GSS-API authentication exception\r\n\tat org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)\r\n\tat org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:192)\r\n\tat org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1392)\r\n\tat org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)\r\n\tat org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)\r\n\tat org.mariadb.jdbc.Driver.connect(Driver.java:89)\r\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:681)\r\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:190)\r\n\tat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:250)\r\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)\r\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52)\r\n\t... 12 more\r\nCaused by: java.sql.SQLException: GSS-API authentication exception\r\n\tat org.mariadb.jdbc.internal.com.send.authentication.gssapi.StandardGssapiAuthentication.authenticate(StandardGssapiAuthentication.java:169)\r\n\tat org.mariadb.jdbc.internal.com.send.authentication.SendGssApiAuthPacket.process(SendGssApiAuthPacket.java:133)\r\n\tat org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.authenticationHandler(AbstractConnectProtocol.java:752)\r\n\tat org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.createConnection(AbstractConnectProtocol.java:553)\r\n\tat org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1387)\r\n\t... 20 more\r\nCaused by: javax.security.auth.login.LoginException: No LoginModules configured for Krb5ConnectorContext\r\n\tat java.base/javax.security.auth.login.LoginContext.init(LoginContext.java:269)\r\n\tat java.base/javax.security.auth.login.LoginContext.<init>(LoginContext.java:357)\r\n\tat org.mariadb.jdbc.internal.com.send.authentication.gssapi.StandardGssapiAuthentication.authenticate(StandardGssapiAuthentication.java:117)\r\n\t... 24 more\r\n"
    },
    "tasks": [],
    "type": "source"
}

 

강사님이 올려주신 카프카 파일을 사용했어요.

 

당연하겠지만 이후 sink나 DB연동 강의 따라해도 전혀 작동하지 않습니다..

답변 2

·

답변을 작성해보세요.

1

흑후추님의 프로필

흑후추

질문자

2023.12.28

https://spring.io/projects/spring-kafka/

 

SpringBoot 3.2 버전은 Kafka 3.6 버전을 사용해야한다고 해서 변경했습니다.

MariaDB는 11.2.2 버전, confluent 7.5.2 사용했습니다.

confluent 7.5* 버전 사용 시 발생하는 에러

Error: Could not find or load main class org.apache.kafka.connect.cli.ConnectDistributed
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.connect.cli.ConnectDistributed

bin\windows\kafka-run-class.bat 코드 변경

기존코드

rem Classpath addition for release
for %%i in ("%BASE_DIR%\libs\*") do (
	call :concat "%%i"
)

rem Classpath addition for core
for %%i in ("%BASE_DIR%\core\build\libs\kafka_%SCALA_BINARY_VERSION%*.jar") do (
	call :concat "%%i"
)

변경코드

rem Classpath addition for release
for %%i in ("%BASE_DIR%\libs\*") do (
	call :concat "%%i"
)

rem Classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
	call:concat %BASE_DIR%\share\java\kafka\*
)

rem Classpath addition for core
for %%i in ("%BASE_DIR%\core\build\libs\kafka_%SCALA_BINARY_VERSION%*.jar") do (
	call :concat "%%i"
)

 

 confluent 실행 시 발생한 에러

log4j:ERROR Could not read configuration file from URL [file:C:/work/confluent-7.5.2/config/connect-log4j.properties].
java.io.FileNotFoundException: C:\work\confluent-7.5.2\config\connect-log4j.properties (지정된 경로를 찾을 수 없습니다)

bin\windows\connect-distributed.bat 코드 변경

기존 코드

rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
	set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/config/connect-log4j.properties
)

변경 코드

rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
	set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/connect-log4j.properties
)

 

포스트맨에서 POST로 전송할 때

"connection.url":"jdbc:mysql://localhost:3306/mydb"
"table.whitelist":"users"

이 부분에서 mysql을 mariadb로, users를 mydb.users로 변경하여 사용했습니다.

 

0

인프런 AI 인턴님의 프로필

인프런 AI 인턴

2023.12.21

안녕하세요, 인프런 AI 인턴입니다.

흑후추님, Kafka에 대한 consumer 문제와 관련하여 문제를 확인하는 데 몇 가지 포인트를 드리겠습니다.

먼저, LEADER_NOT_AVAILABLE 경고는 Kafka가 토픽의 리더 파티션에 연결할 수 없음을 나타내는 경우에 발생할 수 있습니다. 이 문제가 일시적인 경우가 많으나, 문제가 지속되면 Kafka 클러스터 상태를 점검해보아야 합니다.

다음으로, my-source-connect의 상태가 FAILED로 표시되고 있는 점을 봤을 때, Kafka Connect의 소스 커넥터와 관련된 오류로 해당 커넥터가 데이터베이스에 연결하지 못하고 있는 상황으로 보입니다. 오류 메시지에는 java.sql.SQLInvalidAuthorizationSpecException과 관련하여 Could not connect to address=(host=localhost)(port=3307)(type=master) : GSS-API authentication exception이라는 정보가 있습니다. 이것은 JDBC 커넥션 설정의 문제나 데이터베이스의 인증 문제를 가리키고 있습니다.

해결 방안으로 다음과 같은 점을 확인해보셔야 합니다:
1. 데이터베이스가 실행 중이고, localhost의 3307 포트에서 올바르게 리스닝하고 있는지 확인해야 합니다.
2. Kafka Connect의 데이터베이스 연결 설정이 올바른지 확인해야 합니다. 데이터베이스 유저 이름, 비밀번호 등이 정확한지 점검해보세요.
3. GSS-API 인증에 문제가 있다면 데이터베이스의 인증 설정을 확인하시거나, 필요하다면 다른 인증 방식을 사용해야 할 수 있습니다.
4. Kafka나 데이터베이스의 로그를 추가로 확인하여 더 구체적인 오류 메시지를 찾아볼 수 있습니다.

문제 해결을 위해서는 Kafka와 데이터베이스의 상세한 설정 및 로그 분석이 필요하며, 필요에 따라 Kafka Connect의 커넥터 설정을 수정할 수도 있습니다.

Kafka 관련 문서를 참고하시면 도움이 될 수 있으니, 아래 링크들을 확인해보세요:
- Kafka 컨슈머 설정 관련: Apache Kafka Support
- Embedded Kafka 사용법: Testing with Embedded Kafka
- Kafka Streams 설정 관련: Kafka Streams

이 정보들이 문제 해결에 도움이 되길 바랍니다.