inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

카프카 완벽 가이드 - 커넥트(Connect) 편

응용프로그램에서 kafka 에 Producer로 직접 넣은 데이터를 sink connector를 이용해 DB로 받는 방법

해결된 질문

675

lkkikaka

작성한 질문수 1

0

안녕하세요.

Kakfa를 이용 하던 중 다음의 상황을 Kafka (sink) Connector를 사용하여 구현하려 합니다.

 

  1. Clinet에는 응용프로그램이 Data를 수집하여 JSON 포멧으로 Producer를 구현하여 Kafka topic으로 Data를 전송함 (key : null, value : JSON 포멧의 Data)

  2. Kafka의 topic에 쌓여 있는 정보를 다른 응용프로그램이 Consumer를 구현하여 Data를 사용 중 (이와는 병렬로 추가로 DB에 저장 하는것이 목적입니다.)

이 때 이 kafka topic에 저장 되는 data를 kafka sink connector를 사용하여 Database(RDBMS, MSSQL)에 저장을 하려 합니다.

 

topic에 쌓이는 Data의 Schema가 변경이 될 수 있어,

value로 들어 오는 값을 그대로 하나의 string 단일 컬럼으로 지정하여 schema registry에 등록 하여

sink connector로 받아 갈 수 있을 까요? 아니면 더 간단한 방법이 있을 까요?

(Value 값을 string 하나의 컬럼으로 DB로 가져가서 DB에서 parsing 하여 사용할 계획 입니다.)

 

 

kafka 데이터-엔지니어링

답변 1

0

권 철민

안녕하십니까,

음, 의도는 알겠습니다만, 좀 이슈들이 있을 것 같습니다.

먼저 schema registry 에서 동작하려면 topic이 Avro 타입과 같은 형태가 되어야 합니다.

지금 topic은 그냥 json 형태로만 value가 되어 있는 것 같습니다.

topic이 Avro 타입이라고 하더라도, 지금 schema format이 기존 json format에서 명시된 컬럼명/컬럼 타입이 사용되는 것이 아니라 string 하나 컬럼으로 변경되므로 해당 json format이 아니라 별도 json format으로 해당 string 컬럼을 schema 정보로 표시하고, payload값을 기존의 json format이 포함된 문자열 값으로 변환이 되어야 합니다.

제 생각엔 해당 토픽이 Avro가 아니어서 schema registry는 어렵지만, 만약에 Schema와 Payload가 분리된 형태의 Json 포맷이라면 그냥 JSON Converter로 Sink Connector를 구성해서 MSSQL에 입력하면 좋을 것 같습니다. 물론 컬럼 스키마가 자주 변경된 다면 Connector 를 수동으로 내리고 컬럼 스키마 변경 작업을 DB에서 수동으로 적용해야 하지만, 기존에 생각하신 방향대로 Parsing을 적용해서 한다면 컬럼 스키마가 변경될 경우 기존에 만들었던 Parsing 로직을 다 변환해서 적용되어야 하므로 이게 더 어려운 작업이 될 것 같습니다.

원하시는 답변이 아니면 다시 글 부탁드립니다.

감사합니다.

0

lkkikaka

답변 감사드립니다

해결하려 하는 상황이 현재 json 포멧의 string이 입력되고 있는 topic(해당 내용에는 schema, payload 가 없고 payload안에 입력되어지는 메세지 값만 json 포멧으로 되어 있습니다)에 있는 값을 sink connector 를 사용하여 mssql db에 저장 하려 합니다

db의 schema에 따라 파싱이 되지 않고 text 형태로 메세지가 담기면 됩니다

방법이 없을 까요?

1

권 철민

DB로 값을 입력하는 Sink Connector는 스키마 정보가 반드시 있어야 만 합니다. payload 만으로는 데이터를 넣을 수 없습니다. 이 경우는 connector가 아니라 Consumer 코드를 직접 만들어야 할 것 같습니다.

계속 csv파일을 폴더로 인식하는 중입니다.

0

69

2

cdc failover

0

70

2

avro plugin에 안보이는거 같아요

0

70

2

zookeeper가 실행이 안되요

0

80

3

Debezium CDC Source Connector 이벤트 메시지

0

87

2

재 질문! 다른 connector가 동일 토픽 사용

0

68

2

메시지 발행이 안 됩니다.

0

78

2

디비지움 오프셋 리셋

0

96

2

mysql_jdbc_oc_sink_customers_02.json 에서 오류납니다.

0

72

3

io.debezium.connector.mysql.MySqlConnector 질문

0

68

2

Debezium 이벤트 메시지 발행 시 성공여부

0

82

2

connect 구동 오류

0

99

3

CDC Connect 통시에서의 암호화 관련 하여 문의 드립니다.

0

122

2

GCSSinkconnector

0

66

1

debezium source connector에서 `poll.interval.ms` 파라미터

0

145

2

자문자답: JDBC Sink Connector가 PostgreSQL의 Schema를 바라보지 않습니다.

0

214

1

table.name.format에 관하여..

0

197

2

binlog와 offset 관련 추가 질문드립니다

0

152

1

Debezium cdc source for MSSQL-Server [등록오류]

0

193

2

http http://localhost:8083/connector-plugins 시 플러그인이 보이지 않습니다.

0

192

2

안녕하세요 sink connector 생성 config 관련 질문입니다.

0

165

2

수강환경 VirtualBox -> Docker

0

232

2

일반적인 예시들이 궁금해서 질문 남깁니다.

1

149

2

debezium connector schema 찾지 못하는 오류

0

437

2