강의

멘토링

로드맵

Inflearn Community Q&A

kangplay9003's profile image
kangplay9003

asked

Kafka Complete Guide - ksqlDB

복합키 디코딩 오류 질문드립니다.

Written on

·

22

·

Edited

0

안녕하세요 강사님. 좋은 강의 감사드립니다. 항상 강의 잘듣고 있습니다 😀

다름이 아니라, 복합키 디코딩 관련 질문이 있습니다.

 

아래와 같이 Group By와 Window Session 함수를 결합한 CTAS절입니다.

CREATE OR REPLACE TABLE MASTER

WITH (

KAFKA_TOPIC = 'master',

KEY_FORMAT = 'JSON',

VALUE_FORMAT = 'JSON' )

AS SELECT

TRID AS KEY,

AS_VALUE(TRID) AS "trid",

WINDOWSTART AS "min_time",

WINDOWEND AS "max_time",

(WINDOWEND - WINDOWSTART) AS "duration",

MINtimestamp) AS "@timestamp",

COLLECT_LIST(service) AS services,

COLLECT_SET(system) AS systems

FROM ORIGINAL_STREAM

WINDOW SESSION (5 SECONDS)

GROUP BY TRID EMIT CHANGES;

 

제가 기대한 값으로는 master라는 토픽의 key에 trid와 windowstart 값으로 결합된 JSON 형식의 값이 저장되는 것이었습니다. ksqldb에서 print 문으로 topic을 조회하면 잘 읽히지만, kafka-consumer에서 topic을 조회하면, 디코딩 부분에서 깨져서 조회가 됩니다.

명령어: ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cpm_master --from-beginning --property print.key=true

현재는 총 두 개의 쿼리를 추가적으로 사용하여 id 값을 컨슈머가 읽을 수 있도록 정제하고 있습니다. 혹시 이 문제에 대해서 아신다면 답변 주시면 감사하겠습니다!

 

kafka데이터-엔지니어링ksqldb

Answer 2

0

dooleyz3525님의 프로필 이미지
dooleyz3525
Instructor

안녕하십니까,

먼저 ksqlDB에서 Window aggregation 절을 사용하면 key가 windowed key로 바뀌게 되어서 json format이 아니라 ksqDB의 전용 Window format으로 바뀌게 됩니다.

Master Topic를 다시 별도의 Stream Topic으로 재 stream 해보시면 어떨까요? 그리고 이걸 consumer에서 읽어 보시지요.

CREATE STREAM MASTER_FLAT

WITH (KAFKA_TOPIC='master_flat', KEY_FORMAT='JSON', VALUE_FORMAT='JSON')

AS SELECT TRID AS KEY, *

FROM MASTER

EMIT CHANGES;

 

감사합니다.

 

kangplay님의 프로필 이미지
kangplay
Questioner

네 감사합니다. 🙂

kangplay님의 프로필 이미지
kangplay
Questioner

그러면, MASTER 테이블로만으로 생성된 Topic은 elasticsearch sink에서 읽을 수 없는건가요? key 값 디코딩 중 오류가 계속 발생하네요 ㅠ 총 네 개의 스트림을 생성해야만 되는건지 궁금합니다!

dooleyz3525님의 프로필 이미지
dooleyz3525
Instructor

MASTER 테이블에서 오류가 나면 Re-Stream으로 생성한 MASTER_FLAT에 elasticsearch sink 를 적용해 보시지요.

0

안녕하세요, 인프런 AI 인턴입니다. kangplay님.

복합키 디코딩 오류와 관련된 질문을 주셨군요. Kafka의 각종 형식이 상호간의 디코딩 오류를 일으킬 수 있는 것은 흔한 문제입니다. 특히, ksqlDB와 Kafka 콘솔 소비자를 사용할 때 JSON 형식에서 문제가 발생할 수 있습니다.

ksqlDB에서 창(Window) 기능과 함께 복합키를 생성한 경우, 이 키는 종종 복잡한 구조를 가지며, JSON이나 다른 형식으로 직렬화됩니다. ksqlDB 내에서는 이러한 구조적 데이터를 문제없이 처리할 수 있지만, Kafka 콘솔 소비자에서 이를 직렬화/역직렬화하려고 할 때 문제가 발생할 수 있습니다.

문제를 해결하려면 다음의 두 가지 방안을 고려해볼 수 있습니다:

  1. ksqlDB와 호환되는 형식 사용: ksqlDB에서 데이터를 생성할 때, 출력 형식을 JSON이 아닌, UTF-8로 지원되는 String 형식으로 변환을 시도해 볼 수 있습니다. 이렇게 하면, Kafka 콘솔 소비자에서도 데이터를 좀 더 명확하게 읽을 수 있을 것입니다. 이 경우, JSON 구조를 평탄화(flatten) 하는 방법도 고려해 볼 수 있습니다.

  2. 타사 도구 활용: Kafka에서는 kafka-json-schema-console-consumer 같은 도구를 사용하여 JSON 형식의 메시지를 보다 올바르게 역직렬화하여 확인할 수 있습니다. 이러한 도구는 Schema Registry를 통해 JSON 스키마를 가져와 보다 정확하게 데이터를 처리할 수 있습니다.

다음 링크에서 비슷한 주제를 다룬 기존 질문과 답변을 참조해 보실 수 있습니다:
- ‘프로듀서 직렬화 관련 질문 있습니다’
- ‘JDBC source connector - SQL server 질문’

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다. 현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏

추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.

kangplay9003's profile image
kangplay9003

asked

Ask a question