inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)

Kafka Sink Connect 사용

kafka sink 설정 시 테이블 생성이 안됩니다

633

kim미소 파파

작성한 질문수 14

1

{
 "name": "my-sink-connect",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mariadb://localhost:3306/mydb",
    "connection.user": "root",
    "connection.password": "test1234",
     "mode": "incrementing",
    "incrementing.column.name": "id",
    "auto.create": "true",
    "auto.evolve": "true",
    "delete.enabled": "false",
    "tasks.max": "1",
    "topic": "my_topic_users",  
    "table.whitelist": "mydb.users"  
  }
}

confluent-community-connect-7.5.0-zOS

 

confluentinc-kafka-connect-jdbc-10.7.4

이렇게 사용중이고

 

모드를 빼면 에러가 발생하네요

커넨터 로그엔 에러가 없어요

 

[2023-12-14 00:57:25,395] INFO SourceConnectorConfig values:

config.action.reload = restart

connector.class = io.confluent.connect.jdbc.JdbcSourceConnector

errors.log.enable = false

errors.log.include.messages = false

errors.retry.delay.max.ms = 60000

errors.retry.timeout = 0

errors.tolerance = none

exactly.once.support = requested

header.converter = null

key.converter = null

name = my-sink-connect

offsets.storage.topic = null

predicates = []

tasks.max = 1

topic.creation.groups = []

transaction.boundary = poll

transaction.boundary.interval.ms = null

transforms = []

value.converter = null

(org.apache.kafka.connect.runtime.SourceConnectorConfig:369)

[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171)

[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174)

[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Using JDBC dialect MySql (io.confluent.connect.jdbc.source.JdbcSourceTask:138)

[2023-12-14 00:57:25,396] INFO EnrichedConnectorConfig values:

config.action.reload = restart

connector.class = io.confluent.connect.jdbc.JdbcSourceConnector

errors.log.enable = false

errors.log.include.messages = false

errors.retry.delay.max.ms = 60000

errors.retry.timeout = 0

errors.tolerance = none

exactly.once.support = requested

header.converter = null

key.converter = null

name = my-sink-connect

offsets.storage.topic = null

predicates = []

tasks.max = 1

topic.creation.groups = []

transaction.boundary = poll

transaction.boundary.interval.ms = null

transforms = []

value.converter = null

(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:369)

[2023-12-14 00:57:25,397] INFO [my-sink-connect|task-0] [Producer clientId=connector-producer-my-sink-connect-0] Cluster ID: 61ETmEcJQASp3yeJGdTmPw (org.apache.kafka.clients.Metadata:287)

[2023-12-14 00:57:25,413] INFO [my-sink-connect|task-0] Found offset {{table=users}=null, {protocol=1, table=mydb.users}={incrementing=17}} for partition {protocol=1, table=mydb.users} (io.confluent.connect.jdbc.source.JdbcSourceTask:234)

[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:307)

[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] WorkerSourceTask{id=my-sink-connect-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:275)

[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Begin using SQL query: SELECT * FROM mydb.`users` WHERE mydb.`users`.`id` > ? ORDER BY mydb.`users`.`id` ASC (io.confluent.connect.jdbc.source.TableQuerier:182)

spring-boot jpa 아키텍처 spring-cloud kafka msa rabbitmq

답변 1

0

인프런 AI 인턴

안녕하세요, 인프런 AI 인턴입니다.
kim미소 파파님, Kafka Connect를 사용하여 sink 설정 시 테이블 생성이 안 되는 문제를 겪고 계신 것 같군요. 설정해주신 JSON 파일을 통해 볼 때, 마리아DB를 사용하고 있으며, auto.createtrue로 설정되어 자동으로 테이블을 생성하도록 설정되어 있습니다.

문제 해결을 위해서 확인할 점은 다음과 같습니다:
1. Kafka Connect JDBC Connector의 권한 확인: 사용하는 데이터베이스 사용자(root)가 테이블 생성 권한을 가지고 있는지 확인해보세요.
2. 테이블 이름과 스키마: table.whitelist에 명시된 테이블 이름과 스키마가 데이터베이스에 존재하는지 검토해 보세요.
3. Incrementing 컬럼: 설정하신 incrementing.column.name이 실제 테이블에 존재하며 올바른지 확인해보세요.
4. Kafka Connect 버전과 호환성: 사용하고 계신 Kafka Connect의 버전과 JDBC Connector의 버전이 서로 호환되는지 확인해보세요.
5. 에러 로그: 에러 로그가 없다고 하셨으나, 다시 한번 전체 로그를 검토하여 문제가 될 만한 부분이 없는지 확인해보세요.

참고로 Kafka 메시징 지원과 관련된 Spring Boot의 설정 예시는 다음과 같이 application.propertiesapplication.yml에 선언할 수 있습니다:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

또는

spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"

Apache Kafka 지원 문서

따라서 이 정보들을 참고하여 설정을 재점검해보시면 좋겠습니다. 만약 여전히 문제를 해결하지 못하셨다면, 추가적으로 Kafka Connect의 로그나 특정 에러 메시지를 제공해주시면 더 구체적인 해결 방안을 찾는 데 도움이 될 수 있겠습니다.

kafka 업데이트 강의 듣고 시포요

0

82

1

강의 교안

0

68

1

마이크로서비스간 통신 시, 인증 처리

0

76

1

api gateway 에서 인증 처리

0

62

1

섹션 19 질문드립니다

0

50

1

강의 자료 업데이트

0

80

1

부하분산 강의 섹션

0

55

1

강의자료는 어디에서?

0

68

1

강의 자료는 어디서 다운 받을 수 있나요?

0

108

1

전체 사용자 조회시 오류

0

56

1

혹시 pk 외 별도의 id 를 부여한 이유가 있을까요 ??

0

107

2

학습 방향

0

93

2

카프카 커넥터 사용 목적 문의

0

84

2

kafka 강의

0

104

2

서비스 디스커버리 종류

0

85

2

강의 자료에 대해서 궁금해요

0

113

2

GlobalFilter, LoggingFilter가 동작하지 않습니다.

0

87

2

Kafka Source Connect 버전 에러

0

82

2

소스커넥터는 사용안한 거 맞죠?

0

79

2

강의자료 업데이트 문의

0

93

2

강의에서 BCryptPasswordEncoder 에 역할(5-2)

0

55

1

강의 업데이트 계획이 궁금합니다.

0

110

2

MSA 애플리케이션에 Spring Web과 Spring Data JPA를 사용하는 것이 바람직한지 궁금합니다. (MSA 설계와 관련된 질문입니다)

0

160

2

어떤 것이 업데이트 된 건가요?

0

160

2