월 19,800원
5개월 할부 시다른 수강생들이 자주 물어보는 질문이 궁금하신가요?
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
connect-distributed.properties 설정
궁금한 사항이 있어서 질문 남깁니다.connect-distributed.properties 설정 관련 내용입니다.3노드 클러스터로 구성했는데 커넥트 기동하기 강의 편에서설정하는 부분이 있습니다.제가 3노드로 구성했는데 # 삭제하고listeners=HTTP://192.168.20.26:8083,HTTP://192.168.20.27:8083,HTTP://192.168.20.28:8083이렇게 구성하는게 맞는건지 궁금합니다.현재 설정은 #으로 주석 처리되어있습니다.감사합니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
connector의 적정 tasks.max 값은 어떻게 될까요?
s3 sink connector 설정 관련해서 문의드립니다.적절한 tasks.max의 값은 어떻게 될까요?여러 요인이 있겠지만 특히 토픽의 파티션 개수와 관련하여 정해야하는지 문의드립니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
Debezium 문의
Debezium 으로 cdc를 구축하고자 하는데 confluent kafka에서도 지원이 가능한지 문의드립니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
문의드림
https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz 여기서 다운 받는 카푸카랑 confluent kafka랑 다른지 문의드립니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
클러스터 구성 가능 여부
클러스터 3노드(VM 3개 띄울 예정)로 구성하고자 합니다. confluent kafka로 3노드의 클러스터 구성이 가능한가요??? 확인 부탁드립니다. 감사합니다...
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
Debezium Source 에서 topic에 저장되는 UTC시간대 질문
안녕하세요. topic에 UTC 시간대로 저장되는 문제가 있습니다.해결 접근방법에 조언을 듣고 싶습니다.Sink를 적용했을때 customers, products,order_items는 문제없이 적용되었으나 orders테이블의 timestamp타입의 order_datetime컬럼에 문제가 발생하여 SMT 옵션을 추가하다가 발견한 문제입니다.결론적으로 mysql_cdc_oc_sink_orders_01.json에 "transforms": "ConvertDateTimeType", "transforms.ConvertDateTimeType.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.ConvertDateTimeType.target.type": "Timestamp", "transforms.ConvertDateTimeType.field": "order_datetime", "transforms.ConvertDateTimeType.format": "yyyy-MM-dd'T'HH:mm:ss'Z'", "transforms.ConvertDateTimeType.timezone": "Asia/Seoul"위 옵션을 추가하여 sink로 저장을 해결하였으나 topic에 저장되는 시간이 다르게 저장되는것을 발견했습니다.source 데이터베이스에서는 2023-06-20 13:56:40 에 저장하였으나sink 데이터베이스에서는 2023-06-20 04:56:40으로 저장되고 있었습니다.이에 topic을 확인해보니 저장되는 시간대가 2023-06-20 04:56:40으로 topic에서부터 저장되는 값이 다른 것을 알 수 있었습니다.따라서 source설정쪽이 문제일 것 같은데 "database.connectionTimeZone": "Asia/Seoul"옵션을 넣었음에도 UTC로 적용되고있어 질문드립니다. 감사합니다. mysql_cdc_oc_source_01.json{ "name": "mysql_cdc_oc_source_01", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "192.168.56.101", "database.port": "3306", "database.user": "connect_dev", "database.password": "connect_dev", "database.server.id": "10001", "database.server.name": "mysql01", "database.include.list": "oc", "table.include.list": "oc.customers, oc.products, oc.orders, oc.order_items", "database.history.kafka.bootstrap.servers": "192.168.56.101:9092", "database.history.kafka.topic": "schema-changes.mysql.oc", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "database.connectionTimeZone": "Asia/Seoul" } }MYSQL의 TIME_ZONE은 한국시간대입니다.mysql> select @@system_time_zone; +--------------------+ | @@system_time_zone | +--------------------+ | KST | +--------------------+
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
[질문 아님]
나름 대로 개인 프로젝트 만들어가는중입니다.요 강의에서 Zookeeper과 Kafka 분리 설치 및 3노드 구성하여 클러스터 구축 완료...제가 오라클 DBA 14년차이지만 이게 저랑 잘맞는거 같습니다. 좋은 강의 끝까지 열심히 들을게요수고하세요~
- 해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
커넥터, 스키마 레지스트리 관련 질문입니다.
안녕하세요 강사님. 커넥터, 스키마 레지스트리 관련 질문드립니다!현재 json형식의 데이터를 s3 sink connector를 통하여 parquet 형식으로 저장하려고 합니다. json 형식의 데이터는 키 값들이 일정하진 않습니다. 예를 들어 어떤 데이터는 { "test1":"test", "test2":"test2"} 이런식이고, 어떤 데이터는 {"test1":"test1"} 이런 식입니다. 이런 경우에도 스키마 레지스트리를 활용하여 적재가 가능할까요? 없는 키 값들에 대해선 default로 null값을 스키마에 명시하면 자동으로 null처리가 되어 들어오는지 궁금합니다. 그리고 아래처럼 커넥터에 설정을 주면 자동으로 json형식의 데이터가 들어올때 커넥터에서 스키마레지스트리를 바라보고 스키마를 읽어서 parquet로 적재가 되는건지 문의드립니다.value.converter.schema.registry.url=localhost:8080value.converter=io.confluent.connect.avro.AvroConverterformat.class=io.confluent.connect.s3.format.parquet.ParquetFormat
- 해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
oracle source connector 관련 문의
안녕하세요 강사님cdc 관련 적용을 하다가 질문이 있어서 글을 남기게 되었습니다. 오라클 source connector를 생성하려고 하는데요.아래와 같은 설정을 하였는데 DB 전체 스키마에 대해서 스냅샷을 진행하여 시간이 너무 오래 걸리는데요. 혹시 제가 빠뜨린 설정이 있을까요?오라클의 경우는전체 스키마는 5개정도이지만 실제 사용하고자 하는 스키마는 2개에 포함된 테이블 몇 개 정도입니다.{"name": "v2_ora_source_connector_20230516_01","config" : {"connector.class" : "io.debezium.connector.oracle.OracleConnector","db_type":"oracle","tasks.max" : "1","database.server.name" : "v2_source_connector_20230516_01","database.user" : "TEST11","database.password" : "TEST11","database.url": "jdbc:oracle:thin:@10.74.XXX.XXXX:1521:XXXXX","database.dbname" : "SIDV_V2_20230516_01","database.out.server.name":"v2_ora_source_out_20230516_01","schema.history.internal.kafka.bootstrap.servers" : "10.74.XXX.XXX:9092","schema.history.internal.kafka.topic": "ora_source_history_20230516_01","schema.include.list": "TEST11 , TEST22","include.schema.changes": "true","database.connection.adapter": "logminer","topic.prefix": "V2_ORA_SOURCE","table.include.list":"TEST11.GD_CDC_WORK_REQ_MGMT , TEST11.VD_CDC_VEND , TEST22.AM_CDC_ORDER , TEST22.AM_CDC_CLAIM , TEST22.AM_CDC_COUNSEL" ,"include.schema.changes": "true","auto.evolve": "true","time.precision.mode": "connect","key.converter": "io.confluent.connect.avro.AvroConverter","value.converter": "io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url": "http://localhost:8081","value.converter.schema.registry.url": "http://localhost:8081","snapshot.mode" : "initial","tombstones.on.delete": "true","transforms": "rename_topic","transforms.rename_topic.type" : "org.apache.kafka.connect.transforms.RegexRouter","transforms.rename_topic.regex" : "V2_ORA_SOURCE(.*)","transforms.rename_topic.replacement" : "v2_source_$1","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false"}}
- 해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
안녕하세요~~
스키마 레지스트리 강의 듣다가 궁금해서 질문드립니다.하위 호환성에서 쓰기 스키마 V1 읽기 스키마 V2가 있다고 교재에 나와있는데요.저기에 나와있는 스키마 즉 쓰기 스키마는 source db의 table의 스키마이고 읽기 스키마는 동기화되는 target db의 table이 맞죠?그리고 쓰기 스키마를 기준으로 producer와 consumer가 데이터를 serialize, deserialize 하구요?V1, V2라고 되어있고 그래서 schema registry에 있는 version으로 생각했었는데 생각해보니 만약 서로(source, target) 스키마 레지스트리에서 사용하는 버전이 다르다면 그거를 커넥터 정의할 경우 정의를 해주고 해야하는데 그런게 없어서 궁금해서 여쭤봐요~~
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
안녕하세요
궁금한게 있어서 질문드립니다~~consumer를 구현할 때 데이터를 올드 데이터가 덮어쓰는 경우나 삭제 처리 때문에 날짜를 validation하는 로직이 들어가게 되는데요데이터를 올드 데이터가 덮어쓰는 경우는 저장소가 장애가나거나 비즈니스 로직으로인해 에러가 나고 뉴 데이터는 들어가게 되었을 때 올드 데이터가 데드레터에 들어가 있고 복구된다면 덮어쓰게 됩니다.consumer -> validation storevalidation 통과후consumer -> source store위의 과정 후에 저장하게 됩니다.그런데 카프카 커넥터를 기반으로 했을 때는 위의 과정 처리를 어떻게 할 수 있을까요?물론 로직이 들어가게 되므로 커넥터를 사용하지 않고 구현하는게 맞는 것 같기는한데 만약 그렇게 될 경우에는 카프카 커넥터의 사용범위가 로그성 데이터나 초기 데이터를 이관할 때 정도로 사용되는 범위가 축소될 것 같아서 질문드려 봅니다! 감사합니다
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
[수업질문] mysql_cdc_oc_source_test01.json 업로드 에러
안녕하세요. 'Debezium Source Connector 생성하기'수강중에 진행이 막혀 질문드립니다.config를 register_connector로 등록할 때 발생한 이슈인데해당 이슈 : { "error_code": 400, "message": "Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect: Communications link failure\n\nThe last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`" }= 커넥터 구성이 잘못되어 MySQL 서버에 연결할 수 없다로 해석되어 다음과 같은 해결책을 시도해보았습니다.0. http GET http://localhost:8083/connector-plugins | jq '.[].class' 조회시 io.debezium.connector.mysql.MySqlConnector 정상등록 확인1.MySQL 서버 정상실행중 확인2.포트 확인 > SHOW VARIABLES LIKE 'port'; >> 33063.네트워크 연결 상태 확인3-1. mysql 외부접속 허용 확인 : my.cnf에서 bind항목 점검 >> 없음=기본세팅인 허용3-2. 방화벽 규칙 확인 : Chain INPUT, FORWARD, OUTPUT : Accept , sudo iptables -A INPUT -p tcp --dport 3306 -j ACCEPT4.SSL 문제인가? >> my.cnf에 따로 ssl-mode 설정은 없음.5.ifconfig >> enp0s8 inet 192.168.56.101 이렇게 확인했음에도 해결되지않아 도움요청드립니다. config{ "name": "mysql_cdc_oc_source_test01", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "192.168.56.101", "database.port": "3306", "database.user": "connect_dev", "database.password": "connect_dev", "database.server.id": "10000", "database.server.name": "test01", "database.include.list": "oc", "database.allowPublicKeyRetrieval": "true", "database.history.kafka.bootstrap.servers": "192.168.56.101:9092", "database.history.kafka.topic": "schema-changes.mysql.oc", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter" } }connect 로그는 다음과 같습니다.ERROR Failed testing connection for jdbc:mysql://192.168.56.101:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'connect_dev' (io.debezium.connector.mysql.MySqlConnector:103) com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174) at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64) at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:829) at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:244) at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888) at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883) at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:411) at io.debezium.connector.mysql.MySqlConnector.validateConnection(MySqlConnector.java:98) at io.debezium.connector.common.RelationalBaseSourceConnector.validate(RelationalBaseSourceConnector.java:54) at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:459) at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$2(AbstractHerder.java:362) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure
- 해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
JDBC source connector - SQL server 질문
항상 답변 감사합니다.SQL server 에서 JDBC source connector 연결 구성중에 질문있어 드립니다. 환경세팅은 강의와 유사하게 진행하였습니다.timestamp와 incrementing모드로 Config json을 구성하였고MS-SQL DB는 connect_dev 계정에 om 데이터베이스의 customers 테이블을 사용하였습니다.쿼리는 변환하여 삽입하였습니다 { "name": "mssql_jdbc_source_customers", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:sqlserver://localhost:1433;databaseName=om;trustServerCertificate=true", "connection.user": "connect_dev", "connection.password": "connect_dev", "topic.prefix": "mssql_jdbc_", "table.whitelist": "om.customers", "poll.interval.ms": 10000, "mode": "timestamp+incrementing", "incrementing.column.name": "customer_id", "timestamp.column.name": "system_upd" } }config 역시 강의와 유사하나 차이점으론 trustServerCertificate=true 옵션을 줘서 ssl 인증 문제를 회피하였습니다. 이후 질문 드리고 싶은 주요 문제로는 아래의 에러메시지ERROR [mssql_jdbc_source_customers|task-0] WorkerSourceTask{id=mssql_jdbc_source_customers-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195) org.apache.kafka.connect.errors.ConnectException: Cannot make incremental queries using timestamp columns [system_upd] on [] because all of these columns nullable. at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:546) at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:196) at org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:225) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) timestamp columns [system_upd] on [] because all of these columns nullable. (null이라 문제다)가 원인이라는건데,SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'customers' AND COLUMN_NAME = 'system_upd'해당 쿼리를 통해 IS_NULLABLE 컬럼이 NO로 되어있음(=null이 아님)을 확인하였습니다. 이 null 문제의 경우 다른 생각해 볼 접근방법이 뭐가 있을지 여쭤보고싶습니다.incrementing 모드에서는 토픽생성까지 성공하였습니다.감사합니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
안녕하세요. oracle source connector 생성 후 topic 만들 시 발생하는 오류 질문입니다.
안녕하세요. debezium oracle connector를 생성하고 topic 만들 시에 발생하는 오류에 관련하여 질문 있습니다.우선 오류는 [2023-05-07 20:39:26,049] INFO WorkerSourceTask{id=oracle_connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)[2023-05-07 20:39:26,049] WARN Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart (io.debezium.connector.common.BaseSourceTask:243)[2023-05-07 20:39:31,455] WARN Couldn't resolve server kafka:9092 from bootstrap.servers as DNS resolution failed for kafka (org.apache.kafka.clients.ClientUtils:75)[2023-05-07 20:39:31,457] INFO [Producer clientId=UK-dbhistory] Closing the Kafka producer with timeoutMillis = 0 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189)[2023-05-07 20:39:31,457] INFO WorkerSourceTask{id=oracle_connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)[2023-05-07 20:39:31,458] INFO WorkerSourceTask{id=oracle_connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)[2023-05-07 20:39:31,459] ERROR WorkerSourceTask{id=oracle_connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:191)org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) at io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:235) at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:40) at io.debezium.connector.oracle.OracleDatabaseSchema.<init>(OracleDatabaseSchema.java:35) at io.debezium.connector.oracle.OracleConnectorTask.start(OracleConnectorTask.java:55) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:415) ... 14 more[2023-05-07 20:39:31,461] ERROR WorkerSourceTask{id=oracle_connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:192)[2023-05-07 20:39:31,461] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask:192)[2023-05-07 20:39:31,462] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSourceTask:175)java.lang.NullPointerException at io.debezium.connector.oracle.OracleConnectorTask.doStop(OracleConnectorTask.java:129) at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:206) at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:176) at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:173) at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:168) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:195) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 이렇게 발생하는데 topic은 생성되지 않고 connector만 생성되었습니다.kafka task를 삭제하지 못하고 있다는데, 어떻게 해야하나요? connector를 삭제하고 다시 생성해도 동일한 오류가 발생합니다. -oracle connector 설정 파일입니다-{ "name": "oracle_connector", "config": { "connector.class": "io.debezium.connector.oracle.OracleConnector", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.registry.url": "http://0.0.0.0:8081", "database.hostname": "mydb", "database.port": "port", "database.user": "user", "database.password": "userpwd", "database.dbname": "dbname", "topic.prefix": "test_", "tasks.max": "1", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.testdb", "database.server.name": "servername", "database.out.server.name": "servername" } }
- 해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
안녕 하세요
안녕 하세요 궁금한 사항이 있어서 문의드립니다.Debezium CDC source connector 강의 내용중redo 로그에 commit되지 않은 데이터도 쓰여진다고 말씀해주셨는데요저보다 당연히 잘 아시겠지만 제가 알기로는 commit하면 버퍼에 쌓이고 디스크에는 반영되지 않고 그 이후에 주기적으로redo log에 쌓이는 것으로 알고 있습니다즉 commit이 되어야 redo log에 쌓이는 걸로 알고 있는데 제가 잘못알고 있는걸까요? 감사합니다.
- 해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
Git 코드에 ', '빠져 있습니다.
https://github.com/chulminkw/KafkaConnect/blob/723d598394241434f424184998176c903c0967f9/%EC%8B%A4%EC%8A%B5%EC%88%98%ED%96%89/JDBC%20Sink%20Connector%20%EC%8B%A4%EC%8A%B5.mdSource 테이블과 연계하여 Sink 테이블에 데이터 연동 테스트다른 테이블에 대해서도 Sink Connector를 생성하고 Source 테이블에 데이터 입력하여 Sink(Target) 테이블에 데이터가 동기화 되는지 확인.products_sink용 sink connector를 위해서 아래 설정을 mysql_jdbc_sink_products.json 파일에 저장.{ "name": "mysql_jdbc_sink_products", "config": { "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "mysql_jdbc_products", "connection.url": "jdbc:mysql://localhost:3306/om_sink", "connection.user": "connect_dev", "connection.password": "connect_dev", "insert.mode": "upsert", "pk.mode": "record_key" "pk.fields": "product_id", "delete.enabled": "true", "table.name.format": "om_sink.products_sink", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter" } } 실습하다가 발견해서 제보드립니다.해당부분 Git에 "pk.mode": "record_key" 다음에 쉼표가 없어 register_connector 가 작동되지 않았었습니다.쉼표를 추가하니 해결되었습니다 ㅎㅎ
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
안녕하세요~~
kafka-console-consumer --bootstrap-server localhost:9092 --topic spooldir-test-topic --from-beginning --property print.key=true으로 명령어를 쳤을 경우 응답 값이 강사님의 경우에는 schema와 payload로 값이 나오는데 저 같은 경우에는Struct{} {"id":"1000","first_name":"Phineas","last_name":"Frede","email":"pfrederr@weather.com","gender":"Male","ip_address":"59.83.98.78","last_login":"2015-12-04T22:18:07Z","account_balance":"14095.22","country":"PK","favorite_color":"#4f2f2b"}이렇게 나오게 되는데요버전 차이 일까요? connector 설정은 아래와 같습니다{ "name": "csv_spooldir_source", "config": { "tasks.max": "3", "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector", "input.path": "/home/appuser/spool_test_dir", "input.file.pattern": "^.*\\.csv", "error.path": "/home/appuser/spool_test_dir/error", "finished.path": "/home/appuser/spool_test_dir/finished", "empty.poll.wait.ms": 30000, "halt.on.error": "false", "topic": "spooldir-test-topic", "csv.first.row.as.header": "true", "schema.generation.enabled": "true" } }감사합니다.
- 해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
Incrementing 모드로 JDBC Source Connector 생성하기 질문
고생하십니다.질문은 해당 강의 진행중에 에러가 발생하여 질문드립니다. vi ~/connector_configs/mysql_jdbc_om_source_00.json을 통한 사전 설정값{ "name": "mysql_jdbc_om_source_00", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://localhost:3306/om", "connection.user": "connect_dev", "connection.password": "connect_dev", "topic.prefix": "mysql_om_", "topic.creation.default.replication.factor": 1, "topic.creation.default.partitions": 1, "catalog.pattern": "om", "table.whitelist": "om.customers", "poll.interval.ms": 10000, "mode": "incrementing", "incrementing.column.name": "customer_id" } } 명령실행값http POST http://localhost:8083/connectors @mysql_jdbc_om_source_00.json명령 결과값HTTP/1.1 201 Created Content-Length: 566 Content-Type: application/json Date: Tue, 25 Apr 2023 01:47:21 GMT Location: http://localhost:8083/connectors/mysql_jdbc_om_source_00 Server: Jetty(9.4.44.v20210927) { "config": { "catalog.pattern": "om", "connection.password": "connect_dev", "connection.url": "jdbc:mysql://localhost:3306/om", "connection.user": "connect_dev", "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "incrementing.column.name": "customer_id", "mode": "incrementing", "name": "mysql_jdbc_om_source_00", "poll.interval.ms": "10000", "table.whitelist": "om.customers", "tasks.max": "1", "topic.creation.default.partitions": "1", "topic.creation.default.replication.factor": "1", "topic.prefix": "mysql_om_" }, "name": "mysql_jdbc_om_source_00", "tasks": [], "type": "source" } 이렇게 post를 통해서 올렸을 때, CONNECT쪽에서 아래와 같은 문제가 계속 발생하고 있습니다. Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)connect에 에러가 발생해 일단 강의 진행을 멈춘 상태입니다. 제가 혹시 놓친 부분이 있을지 질문드립니다.
- 해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
안녕 하세요~~
2가지 궁금한게 있는데요첫번째는 카프카 커넥트와 직접 코드로 구현되는 컨슈머 중 하나를 선택해야 할 때 어떤 기준으로 선택하는 것이 좋을까요?커넥트 장점을 보면 추측은 가능하지만 경험이 없으니 실무에서 어떨 때는 커넥트, 어떨 때는 컨슈머 직접 구현을 선택할 수 있는지(이건 경험 꽤 많아요) 기준이 궁금합니다 두번재는 자동으로 토픽 생성시에 시스템 종료하면 어떤게 문제가 될까요?그레이스 풀 셧다운 관련인지 어떤 문제가 생기는지 궁금합니다 감사합니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
kafka 에 직접 넣은 데이터를 sink connector로 보내는 방법
안녕하세요 자주 질문을 드리게 되어 죄송합니다.데이터가 이동하면서 아래와 같은 작업이 이루어지는 것을 강의를 통해 이해했습니다.source connector --> kafka : 직렬화kafka --> sink connector : 역직렬화 그렇다면 혹시 java 를 사용하여 kafka로 바로 데이터를 적재 할 경우 sink connector로 데이터를 보낼 수가 있을까요?? 역직렬화 작업이 안될 것으로 보여서 가능한지 질문 드립니다.가능하지 않다면 java로 kafka를 보낼 때 직렬화 작업이 된 상태로 보내야 할까요??