inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

카프카 완벽 가이드 - 커넥트(Connect) 편

mongodb sink connect 사용 중 update, delete 문제

1112

김진봉

작성한 질문수 2

0

안녕하세요.

선생님 강의를 듣고 kafka connect 매커니즘에 대해 상세하게 알게 됐습니다.

다만, 실무에 적용을 하는 도중 문제에 봉착해 도움을 구하고자 문의드립니다.

현재 debezium mysql connector를 사용하여 source 데이터는 topic으로 저장하는데 성공하였지만,

해당 데이터를 mongodb 에 저장하는데 저장/업데이트는 정상적으로 되지만 delete 시 반영이 안되는 문제가 있습니다.

RDB와는 다르게 mongodb sink connector는 insert.mode는 지원하지 않고

write model Strategy 를 활용하는 걸로 보이는데,

아래와 같이 sink connector를 설정할 경우 account_id 를 key 로 해서 업데이트는 가능한데, 삭제는 안되네요?

   "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",

   "document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
   "document.id.strategy.partial.value.projection.list":"account_id",
   "document.id.strategy.partial.value.projection.type":"AllowList",
   "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",

혹시 Source 에서 입력, 업데이트, 삭제를 mongodb에 반영하려면 어떻게 해야 되는지 알 수 있을까요?

감사합니다.

 

kafka 데이터-엔지니어링

답변 1

0

권 철민

안녕하십니까,

mongodb는 저도 해보지 않아서 정확한 답변이 아닐 수 있지만,

먼저 debezium source connector에서 topic으로 delete message에 key값은 있지만 value가 null인지, 메시지를 확인해 보십시요.

그리고 sink connector에 아래를 설정해 보시지요.

"delete.on.null.values": "true"

해당 파라미터는 설명은 아래에 나와 있습니다.

https://www.mongodb.com/docs/kafka-connector/current/sink-connector/configuration-properties/id-strategy/

그런데 FullKeyStrategy, PartialKeyStrategy, and ProvidedInKeyStrategy

에서 동작한다고 되어 있군요. 테스트가 필요해 보입니다.

감사합니다.

 

0

김진봉

    "document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy",
    "document.id.strategy.partial.key.projection.list":"account_id",
    "document.id.strategy.partial.key.projection.type":"AllowList",
    "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
    "delete.on.null.values":"true",

알려주신대로 delete.on.null.values 를 true로 하고, Write Strategy를 PartialKeyStrategy로 했는데도, 업데이트는 되지만 삭제는 안되네요.

삭제를 수행하면 topic에 아래와 같이 key는 넘어가고 value는 null로 들어갑니다.

{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"account_id"}],"optional":false,"name":"fri.testdb.accounts.Key"},"payload":{"account_id":"111"}}    null

혹시나 해서 여러가지 형태로 메시지를 변형해 봤는데도 안되네요.

{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"account_id"}],"optional":false,"name":"fri.testdb.accounts.Key"},"payload":{"account_id":"111"}}	{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"account_id"},{"type":"string","optional":true,"field":"role_id"},{"type":"string","optional":true,"field":"user_name"},{"type":"string","optional":true,"field":"user_description"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"default":0,"field":"update_date"}],"optional":false,"name":"fri.testdb.accounts.Value"},"payload":{"account_id":"111",null}}
...
{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"default":0,"field":"update_date"}],"optional":false,"name":"fri.testdb.accounts.Value"},"payload":{"account_id":"111"}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"default":"","field":"account_id"}],"optional":false,"name":"fri.testdb.accounts.Key"},"payload":{"account_id":"111"}}

좀 더 테스트를 해봐야겠습니다. ㅠㅠ

감사합니다.

계속 csv파일을 폴더로 인식하는 중입니다.

0

69

2

cdc failover

0

70

2

avro plugin에 안보이는거 같아요

0

70

2

zookeeper가 실행이 안되요

0

80

3

Debezium CDC Source Connector 이벤트 메시지

0

87

2

재 질문! 다른 connector가 동일 토픽 사용

0

68

2

메시지 발행이 안 됩니다.

0

78

2

디비지움 오프셋 리셋

0

96

2

mysql_jdbc_oc_sink_customers_02.json 에서 오류납니다.

0

72

3

io.debezium.connector.mysql.MySqlConnector 질문

0

68

2

Debezium 이벤트 메시지 발행 시 성공여부

0

82

2

connect 구동 오류

0

99

3

CDC Connect 통시에서의 암호화 관련 하여 문의 드립니다.

0

122

2

GCSSinkconnector

0

66

1

debezium source connector에서 `poll.interval.ms` 파라미터

0

145

2

자문자답: JDBC Sink Connector가 PostgreSQL의 Schema를 바라보지 않습니다.

0

214

1

table.name.format에 관하여..

0

197

2

binlog와 offset 관련 추가 질문드립니다

0

152

1

Debezium cdc source for MSSQL-Server [등록오류]

0

193

2

http http://localhost:8083/connector-plugins 시 플러그인이 보이지 않습니다.

0

192

2

안녕하세요 sink connector 생성 config 관련 질문입니다.

0

165

2

수강환경 VirtualBox -> Docker

0

232

2

일반적인 예시들이 궁금해서 질문 남깁니다.

1

149

2

debezium connector schema 찾지 못하는 오류

0

437

2