월 19,800원
5개월 할부 시다른 수강생들이 자주 물어보는 질문이 궁금하신가요?
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
Schema Registry 스키마 호환성 질문
안녕하세요, 늘 양질의 강의 제공해 주심에 감사합니다.강의를 듣다가 알쏭달쏭한 부분이 있어 질문드립니다. Schema Registry 를 활용하여 하위(Backward) 호환성 유지 시, 새로운 스키마를 설정되게 되면 Worker Conenct 를 강제로 실행시켜 줘서 Sink Connector 가 새로운 스키마가 갱신되도록 해줘야하는 절차가 꼭 필요한가요? (Consumer 가 새로운 버전으로 스키마 업데이트가 되도록 강제하기 위한 수동 절차가 꼭 필요한 것인지 궁금합니다. 아님 이러한 부분도 자동으로 Schema registry 에서 Sink 쪽으로 콜백을 주어 캐시된 스키마에 대한 정보 업데이트가 되도록 되는건지,,) (Forward 호환성인 경우) Source Connector 에서 V1 버전 스키마로 메세지를 발행하다가, 어느 순간 신규 스키마 버전 V2 로 메세지를 발행하게 되어 Schema registry 에 V2 가 등록이 된 상태에서, Sink 쪽에서는 이 메세지를 받게 되면 schema id : v2 인 정보를 카프카 메세지 내에 정보를 통해 알게 될텐데, 그럼에도 아직 워커 커넥트 내부 캐시된 스키마 정보엔 v2 가 없더라도, Forward 호환성인 경우에는 Sink Connector(Consumer) 입장에선 Schema registry 를 새로이 호출하여 스키마 업데이트를 하지 않는걸까요 ? 보통 운영환경에서 스키마 관리 자체를 DB source connector 에서 자동으로 만들어주는 스키마가 아니라 Schema registry 에 직접 새로운 스키마를 등록하여 운영환경에서 활용하시는지, 혹은 SOurce connector 의 자동 스키마 완성으로 사용하시는지 궁금 합니다. 양질의 강의에 다시한번 감사드리며 답변 부탁드리겠습니다. 좋은 하루 되세요.감사합니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
스키마 호환성 질문
안녕하세요https://www.inflearn.com/questions/1207899/%EC%8A%A4%ED%82%A4%EB%A7%88-%ED%98%B8%ED%99%98%EC%84%B1-%EC%A7%88%EB%AC%B8%EC%9E%88%EC%8A%B5%EB%8B%88%EB%8B%A4이 질문에 대한 추가 질문이 있습니다. BACKWARD에 대해 지원하는 신규 스키마 변경에서기존 필드 삭제, 기본 값 가진 신규 필드 추가이 부분은 sink쪽 스키마에서의 변경에 대한 내용인 것은 맞는건가요? 예를 들어 스키마 호환성 체크에서새로 추가되는 컬럼이 기본 값을 가지고 있지 않으면 하위 호환성 오류다. 라는 문장이 있다면"sink 쪽에서" 새로 추가되는 컬럼이 기본 값을 가지고 있지 않으면 하위 호환성 오류다. 라고 해석하는 것이 맞는지 질문드립니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
schema registry 포맷 지원
안녕하세요, schema registry를 사용할 때 avro를 사용해야한다고 말씀해주셨는데요, Json 포맷은 지원하지 않는건가요? 물론 avro의 경우 필드 정보가 payload에 빠지고 바이너리 형태이니 전송이 가장 빠르고 최적화된 포맷이라는 것은 알지만, Json 포맷도 설정을 통해 스키마를 지원할 수 있도록 할 수 있는데 schema registry의 지원 대상에서 제외되는 것인지 궁금합니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
Mysql bin log 질문있습니다
안녕하세요, postgresql의 logical replication에서는 publication이라고하는 모듈을 통해 복제 대상 테이블을 정해놓는데요 이렇게 되면 WAL에는 대상 테이블의 변경 내용만 담기는 건가요? 그리고 mysql에는 publication과 같은 모듈이 없는 것 같은데 bin 로그에는 그럼 모든 테이블의 트랜잭션 로그가 담기는 건지 궁금합니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
디비지움 소스 테이블 스키마 변경 질문
안녕하세요, 디비지움 소스 테이블 스키마 변경에 따른 자동 스키마 반영에 대한 질문이 있습니다. 소스 테이블에 변경이 있을 때 예를 들어 varchar 타입 컬럼에 default를 적용하는 변경을 수행하는 경우 sink 쪽에서 오류가 발생하는 것으로 알고 있습니다.그런데 만약 디비지움에 스키마 레지스트리가 적용되어 있고, 스키마 호환성을 체크한다고 한다면디비지움의 소스 테이블 변경 규칙은 무시될수도 있으며, 스키마 레지스트리 호환성에 따라 동작하게 되는 것인가요?
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
스키마 호환성 질문있습니다.
안녕하세요, 스키마 레지스트리의 compatibility가 BACKWARD로 설정되어 있는 상태에서 nullable int 타입 컬럼을 source 쪽에서 추가하셨는데, BACKWARD 호환성의 경우는 읽기 쪽부터 변경스키마를 반영해야 하는 것이 아닌가요?nullable이므로 default가 있는 컬럼의 추가라서 BACKWARD를 만족한다는게 왜 소스쪽에서 추가한 내용 바탕으로 설명이 되는 것인지 이해가 되지 않습니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
[수정] mysql_jdbc_oc_sink_orders_datetime_tab_02.json "," 수정 요청
안녕하세요.. kafkaconnect 실습 수행 중 mysql_jdbc_oc_sink_orders_datetime_tab_02.json 문장안에 끝 부분에 "," 가 수정이 아직 안되어 있습니다. 깜빡 하신거 같네요..ㅎㅎ
- 해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
auto.evolve 옵션 질문
안녕하세요, 질문이 있습니다. auto.evolve=true로 sink 쪽에 설정하고 source DB의 칼럼 정보를 업데이트 한 후에 레코드를 삽입하지 않으면 토픽에 메시지가 전송되지 않으니 target DB에는 반영되지 않는건가요?
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
database.connectionTimezone 옵션 질문
안녕하세요 , "database.connectionTimezone": "Asia/Seoul"위와 같은 Source 커넥터의 timezone 옵션을 지정해도 Source 쪽에서 Timezone을 반영해서 보내는 것이 아니라면굳이 적용하지 않아도 괜찮은걸까요?적용을 해야하는 이유가 있는지 궁금합니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
Debezium snapshot 질문
안녕하세요,Debezium에서 snapshot을 수행할 때 읽어오는 DB 스키마는 커넥터가 어디에 저장해두는건가요?레코드의 경우 config에 설정된 topic에 저장되고, 또 offset의 경우 connect-offsets이라는 내부 토픽에 저장되는 것은 알겠는데, 테이블 DDL 정보는 따로 저장하는 곳이 있는 것인지 궁금합니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
해결하지 못한 에러가 발생 하였습니다.
안녕하세요 개발자님 에러를 해결 하지 못해 도움을 받고 싶습니다.ksqldb 마지막 강의를 마치고 실습을 하던중 mysql 테이블을topic으로 connector(avro)를 한후 ksqldb에서 table을 만드는 과정에서 타입 변환 문제가 발생 하였습니다. avro를 통해 register에 스키마를 저장 하고 사용 하고자 하였습니다.강의 해주신 .properties 설정은 하였구요.topic에 데이터 들어온느거 확인스키마 확인sqldb 테이블 생성은 되지만 검색시 밑에와 같은 에러가 발생합니다. source, sink connector 실습은 잘 되었으며, ksqldb 거치지 않고 ELK에 데이터도 잘 보내 집니다. ksqldb에서 table 생성 과정에서 PRIMARY KEY설정을 하고 생성이 됩니다. 하지만 검색을 하면 밑에와 같은 에러가 납니다.PRIMARY KEY없이 table을 생성하면 key값이 보내면 Json형태의 키로 배출됩니다. {CUSTOMER_ID=1}key 타입을 INTEGER, bigint, int 타입 해보았습니다.mysql table도 다른걸로 만들어보고 했습니다.혹시 네가 노친것이 무엇인가요?어떻게 해야 할까여? register 실행 로그를 보니 WARNING: A provider io.confluent.kafka.schemaregistry.rest.resources.SchemasResource regi stered in SERVER runtime does not implement any provider interfaces applicable in the SER VER runtime. Due to constraint configuration problems the provider io.confluent.kafka.sch emaregistry.rest.resources.SchemasResource will be ignored.Feb. 08, 2024 5:59:52 A.M. org.glassfish.jersey.internal.inject.Providers checkProviderRu ntime있습니다. 어떻게 해야 하나요? [2024-02-08 00:47:43,983] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Error deserializing message from topic: mysqlavro022.oc.customers","recordB64":null,"cause":["Cannot deserialize type struct as type int32 for path: "],"topic":"mysqlavro022.oc.customers"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.transient_OC_CUSTOMER_3798951142359913405.KsqlTopic.Source.deserializer:44)[2024-02-08 00:47:43,988] WARN stream-thread [_confluent-ksql-default_transient_transient_OC_CUSTOMER_3798951142359913405_1707320860130-b2b59a3e-3875-4eab-ad2a-185533cf65bc-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[mysqlavro022.oc.customers] partition=[0] offset=[0] (org.apache.kafka.streams.processor.internals.RecordDeserializer:89)org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: mysqlavro022.oc.customersat io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:55)at io.confluent.ksql.serde.tls.ThreadLocalDeserializer.deserialize(ThreadLocalDeserializer.java:37)at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:47)at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:26)at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:61)at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:48)at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54)at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65)at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:178)at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304)at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:968)at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068)at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)Caused by: org.apache.kafka.connect.errors.DataException: Cannot deserialize type struct as type int32 for path:at io.confluent.ksql.serde.connect.ConnectDataTranslator.throwTypeMismatchException(ConnectDataTranslator.java:71)at io.confluent.ksql.serde.connect.ConnectDataTranslator.validateType(ConnectDataTranslator.java:90)at io.confluent.ksql.serde.connect.ConnectDataTranslator.validateSchema(ConnectDataTranslator.java:154)at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlValue(ConnectDataTranslator.java:200)at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlRow(ConnectDataTranslator.java:54)at io.confluent.ksql.serde.avro.AvroDataTranslator.toKsqlRow(AvroDataTranslator.java:67)at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:51)... 18 more user01@ubuntu-20:~/kafka/connector_configs/cdc_source_mysql$ register_connector cdc_source_mysql/mysql_cdc_ops_source_avro_01.jsonHTTP/1.1 201 CreatedContent-Length: 1007Content-Type: application/jsonDate: Wed, 07 Feb 2024 15:42:52 GMTLocation: http://localhost:8083/connectors/mysql_cdc_ops_source_avro_03Server: Jetty(9.4.44.v20210927){"config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","database.connectionTimezone": "Asia/Seoul","database.history.kafka.bootstrap.servers": "localhost:9092","database.history.kafka.topic": "schema-changes.mysql.oc","database.hostname": "192.168.0.26","database.include.list": "oc","database.password": "1234","database.port": "3306","database.server.id": "31002","database.server.name": "mysqlavro022","database.user": "cnt_dev","key.converter": "io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url": "http://localhost:8081","name": "mysql_cdc_ops_source_avro_03","table.include.list": "oc.customers","tasks.max": "1","time.precision.mode": "connect","transforms": "unwrap","transforms.unwrap.drop.tombstones": "false","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://localhost:8081"},"name": "mysql_cdc_ops_source_avro_03","tasks": [],"type": "source"} user01@ubuntu-20:~/kafka/data/kafka-logs$ show_topic_messages avro mysqlavro022.oc.customers{"customer_id": 1}{"customer_id": 1,"email_address": "test","full_name": "test"}user01@ubuntu-20:~/kafka$ http GET http://localhost:8081/schemas{"id": 23,"schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"mysqlavro022.oc.customers\",\"fields\":[{\"name\":\"customer_id\",\"type\":\"int\"}],\"connect.name\":\"mysqlavro022.oc.customers.Key\"}","subject": "mysqlavro022.oc.customers-key","version": 1},{"id": 24,"schema": "{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"mysqlavro022.oc.customers\",\"fields\":[{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"email_address\",\"type\":\"string\"},{\"name\":\"full_name\",\"type\":\"string\"}],\"connect.name\":\"mysqlavro022.oc.customers.Value\"}","subject": "mysqlavro022.oc.customers-value","version": 1}, CREATE TABLE oc_customer (customer_id int PRIMARY KEY,email_address varchar,full_name varchar) WITH (KAFKA_TOPIC = 'mysqlavro022.oc.customers',KEY_FORMAT = 'AVRO',VALUE_FORMAT = 'AVRO'); ksql> describe oc_customer extended;Name : OC_CUSTOMERType : TABLETimestamp field : Not set - using <ROWTIME>Key format : AVROValue format : AVROKafka topic : mysqlavro022.oc.customers (partitions: 1, replication: 1)Statement : CREATE TABLE OC_CUSTOMER (CUSTOMER_ID INTEGER PRIMARY KEY, EMAIL_ADDRESS STRING, FULL_NAME STRING) WITH (KAFKA_TOPIC='mysqlavro022.oc.customers', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');Field | Type------------------------------------------------CUSTOMER_ID | INTEGER (primary key)EMAIL_ADDRESS | VARCHAR(STRING)FULL_NAME | VARCHAR(STRING)------------------------------------------------Local runtime statistics------------------------
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
디비지움 sink connector 사용에 대해 질문있습니다.
디비지움은 source connector로만 사용하고 sink는 jdbc connector를 사용한다고 하셨는데요,그 이유가 sink connector를 디비지움 커넥터로 구축 시 소스 DB의 DDL 변경을 타겟 DB에서 반영하기 어렵다는 것인가요? 만약 맞다면 JDBC sink에서는 DDL에 대해서 잘 반영하는 것인지 궁금합니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
timestamp 방식에서 table index 필수 여부
안녕하세요, 각 테이블 생성 시 timestamp 칼럼에 대해 index를 생성해 주셨는데요, 이거는 필수로 지정해야 하는건가요?
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
key 값의 필요성에 대해 질문있습니다.
안녕하세요 JDBC Sink connector에 config 옵션으로 pk.mode를 통해 record key 값을 지정해주어야 하니, source connector 쪽에서도 transform을 통해 record key 값으로 pk를 추출해야 한다는 명분은 이해했는데요, value에 pk 값이 이미 있고, config를 통해 pk 필드가 무엇인지 까지 지정해줬는데 value를 통해 값을 획득하지 않고 record key에 다시 추출해야하는 이유가 뭘지 궁금합니다!
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
connector plugin dir 질문있습니다.
안녕하세요, 강의에서 connector plugin.path 지정 시 직접 생성한 dir에 plugin 별 서브 dir를 두고 jar 파일을 옮겼는데요, 서브 디렉토리가 필요한 이유가 있을까요? 그리고 서브 디렉토리명은 임의로 지어도 되는 것인지 궁금합니다! 감사합니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
Connect Task 질문
안녕하세요 우선 좋은 강의 감사드립니다.다름이 아니라 강의에서 Connect Task 가 thread 로 작동한다고 말씀하신 부분을한 Worker process 내에서 여러 개의 thread(task) 로 병렬 처리를 할 수 있다 라고 이해했습니다. 그럼 혹시 여러 Task 가 동시에 접근할 수 있는 변수를 선언해서 사용할 수 있을까요? race_condition 같은 문제를 해결하기 위해 mutex 를 사용할 것 같긴 한데 이러한 구현이 가능한 지 궁금합니다. 감사합니다
- 해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
응용프로그램에서 kafka 에 Producer로 직접 넣은 데이터를 sink connector를 이용해 DB로 받는 방법
안녕하세요.Kakfa를 이용 하던 중 다음의 상황을 Kafka (sink) Connector를 사용하여 구현하려 합니다. Clinet에는 응용프로그램이 Data를 수집하여 JSON 포멧으로 Producer를 구현하여 Kafka topic으로 Data를 전송함 (key : null, value : JSON 포멧의 Data)Kafka의 topic에 쌓여 있는 정보를 다른 응용프로그램이 Consumer를 구현하여 Data를 사용 중 (이와는 병렬로 추가로 DB에 저장 하는것이 목적입니다.)이 때 이 kafka topic에 저장 되는 data를 kafka sink connector를 사용하여 Database(RDBMS, MSSQL)에 저장을 하려 합니다. topic에 쌓이는 Data의 Schema가 변경이 될 수 있어,value로 들어 오는 값을 그대로 하나의 string 단일 컬럼으로 지정하여 schema registry에 등록 하여sink connector로 받아 갈 수 있을 까요? 아니면 더 간단한 방법이 있을 까요?(Value 값을 string 하나의 컬럼으로 DB로 가져가서 DB에서 parsing 하여 사용할 계획 입니다.)
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
JDBC Sink Connector 에서 Topic Commit 처리 문의
안녕하세요 강사님JDBC Sink Connector 에서 Topic 의 파티션이 다수인 경우 Topic 에 대한 Commit 처리를 어떻게 하는지 궁금합니다.예를 들어 CDC Connector 로 저장된 Topic 을 MySQL 로 Sink 하고자 하는 경우이고 Topic 스키마에 당연히 키값은 있고 파티션이 10개 정도 된다고 했을때 Sink Connector 에서 최대 1000 개 데이터를 batch 로 DB에 처리하도록 설정했다면 Topic 에 대한 Commit 처리를 offset 정보를 loop 돌면서 commit 하는걸까요?소스를 참고할수있다면 소스 레벨로 알려주시면 감사하겠습니다.
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
커넥트를 이용한 데이터 마이그레이션 질문드립니다
안녕하세요, 양질의 강의 항상 감사드립니다!다름이 아니라 현재 실무에서 스프링 배치를 이용해서 데이터를 DB to DB 로 마이그레이션하는 업무가 있는데 이를 강의에서 배운 카프카 커넥트를 활용하면 좋을것 같아서 고민중인데 잘 안풀리는 부분이 있어 질문드립니다 데이터 가공문제DB to DB 로 이관시 가장 좋은것은 커넥트만 써서 코드개발없이 마이그레이션 해버리는 것이 좋을것같은데 막상 실제 마이그레이션 할 때는 source 에서 퍼온 데이터를 sink 시 몇몇 컬럼은 데이터 가공하거나 없어지는 등 가공에 약간 손을 봐야하는 경우가 종종 있어서 이런 케이스에 대해 어떻게 해결을 해야할 지 고민입니다.고민을 해봤을 때 해결방안으로는 streams api 를 껴서 아래와 같은 아키텍쳐로 해결하는 방법이 있을 것 같은데요source -> topic -> kafka stream api 등을 통해 데이터 가공 후 topic 전송 -> sink 커스텀 sink 나 source 커넥터 개발어느 방법이나 결국 추가개발이 필요한건 매한가지라 현재 배치로 개발해놓은 구조를 커넥트를 이용하도록 바꿨을 때 이점이 명확하게 안보여서 좀 고민입니다... 데이터 마이그레이션 후 검증문제실무에서 배치로 마이그레이션 후 원본데이터가 있는 source db 와 마이그레이션 한 sink db 간에 건수비교 등 데이터 이관이 잘 되었는지 대사비교를 진행하는데요, 카프카 커넥트로 전환한다고 해도 이러한 대사비교는 여전히 필요하지 않을까 싶은데 실무에서 카프카 커넥트를 활용할 때 이러한 검증 문제를 어떻게 해결하는지 궁금합니다 카프카 커넥트를 실무에서 활용한 경험이 없다보니.... 강사님이라면 이러한 문제에 직면했을때 어떤 생각을 가지실 지 궁금합니다!감사합니다:)
- 미해결카프카 완벽 가이드 - 커넥트(Connect) 편
cdc 동기화 테스트 환경 초기화
안녕하세요열심히 수강중인 수강생입니다.mysql과 postgresql을 동기화하는 cdc를 테스트중인데무언가 꼬였나봅니다.초기화하는 방법이 궁금합니다.강의의 어느 부분을 참고하면 되는지 답변 부탁드립니다감사합니다.