kafka sink 설정 시 테이블 생성이 안됩니다
633
작성한 질문수 14
{
"name": "my-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mariadb://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "test1234",
"mode": "incrementing",
"incrementing.column.name": "id",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topic": "my_topic_users",
"table.whitelist": "mydb.users"
}
}confluent-community-connect-7.5.0-zOS
confluentinc-kafka-connect-jdbc-10.7.4
이렇게 사용중이고
모드를 빼면 에러가 발생하네요
커넨터 로그엔 에러가 없어요
[2023-12-14 00:57:25,395] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
exactly.once.support = requested
header.converter = null
key.converter = null
name = my-sink-connect
offsets.storage.topic = null
predicates = []
tasks.max = 1
topic.creation.groups = []
transaction.boundary = poll
transaction.boundary.interval.ms = null
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig:369)
[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171)
[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174)
[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Using JDBC dialect MySql (io.confluent.connect.jdbc.source.JdbcSourceTask:138)
[2023-12-14 00:57:25,396] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
exactly.once.support = requested
header.converter = null
key.converter = null
name = my-sink-connect
offsets.storage.topic = null
predicates = []
tasks.max = 1
topic.creation.groups = []
transaction.boundary = poll
transaction.boundary.interval.ms = null
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:369)
[2023-12-14 00:57:25,397] INFO [my-sink-connect|task-0] [Producer clientId=connector-producer-my-sink-connect-0] Cluster ID: 61ETmEcJQASp3yeJGdTmPw (org.apache.kafka.clients.Metadata:287)
[2023-12-14 00:57:25,413] INFO [my-sink-connect|task-0] Found offset {{table=users}=null, {protocol=1, table=mydb.users}={incrementing=17}} for partition {protocol=1, table=mydb.users} (io.confluent.connect.jdbc.source.JdbcSourceTask:234)
[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:307)
[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] WorkerSourceTask{id=my-sink-connect-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:275)
[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Begin using SQL query: SELECT * FROM mydb.`users` WHERE mydb.`users`.`id` > ? ORDER BY mydb.`users`.`id` ASC (io.confluent.connect.jdbc.source.TableQuerier:182)
답변 1
0
안녕하세요, 인프런 AI 인턴입니다.
kim미소 파파님, Kafka Connect를 사용하여 sink 설정 시 테이블 생성이 안 되는 문제를 겪고 계신 것 같군요. 설정해주신 JSON 파일을 통해 볼 때, 마리아DB를 사용하고 있으며, auto.create는 true로 설정되어 자동으로 테이블을 생성하도록 설정되어 있습니다.
문제 해결을 위해서 확인할 점은 다음과 같습니다:
1. Kafka Connect JDBC Connector의 권한 확인: 사용하는 데이터베이스 사용자(root)가 테이블 생성 권한을 가지고 있는지 확인해보세요.
2. 테이블 이름과 스키마: table.whitelist에 명시된 테이블 이름과 스키마가 데이터베이스에 존재하는지 검토해 보세요.
3. Incrementing 컬럼: 설정하신 incrementing.column.name이 실제 테이블에 존재하며 올바른지 확인해보세요.
4. Kafka Connect 버전과 호환성: 사용하고 계신 Kafka Connect의 버전과 JDBC Connector의 버전이 서로 호환되는지 확인해보세요.
5. 에러 로그: 에러 로그가 없다고 하셨으나, 다시 한번 전체 로그를 검토하여 문제가 될 만한 부분이 없는지 확인해보세요.
참고로 Kafka 메시징 지원과 관련된 Spring Boot의 설정 예시는 다음과 같이 application.properties나 application.yml에 선언할 수 있습니다:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
또는
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
따라서 이 정보들을 참고하여 설정을 재점검해보시면 좋겠습니다. 만약 여전히 문제를 해결하지 못하셨다면, 추가적으로 Kafka Connect의 로그나 특정 에러 메시지를 제공해주시면 더 구체적인 해결 방안을 찾는 데 도움이 될 수 있겠습니다.
kafka 업데이트 강의 듣고 시포요
0
82
1
강의 교안
0
68
1
마이크로서비스간 통신 시, 인증 처리
0
76
1
api gateway 에서 인증 처리
0
62
1
섹션 19 질문드립니다
0
50
1
강의 자료 업데이트
0
80
1
부하분산 강의 섹션
0
55
1
강의자료는 어디에서?
0
68
1
강의 자료는 어디서 다운 받을 수 있나요?
0
108
1
전체 사용자 조회시 오류
0
56
1
혹시 pk 외 별도의 id 를 부여한 이유가 있을까요 ??
0
107
2
학습 방향
0
93
2
카프카 커넥터 사용 목적 문의
0
84
2
kafka 강의
0
104
2
서비스 디스커버리 종류
0
85
2
강의 자료에 대해서 궁금해요
0
113
2
GlobalFilter, LoggingFilter가 동작하지 않습니다.
0
87
2
Kafka Source Connect 버전 에러
0
82
2
소스커넥터는 사용안한 거 맞죠?
0
79
2
강의자료 업데이트 문의
0
93
2
강의에서 BCryptPasswordEncoder 에 역할(5-2)
0
55
1
강의 업데이트 계획이 궁금합니다.
0
110
2
MSA 애플리케이션에 Spring Web과 Spring Data JPA를 사용하는 것이 바람직한지 궁금합니다. (MSA 설계와 관련된 질문입니다)
0
160
2
어떤 것이 업데이트 된 건가요?
0
160
2





