inflearn logo
강의

Course

Instructor

Kafka Complete Guide - ksqlDB

table의 데이터가 실시간으로 topic에 담기지 않습니다

226

kkmdevel

9 asked

0

mysql에 debezium source connector로 topic에 가져온 데이터를 받는 stream을 만들고

그 stream을 기반으로 하여

CREATE TABLE timeout WITH (KAFKA_TOPIC='timeout' , KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=1) AS
> SELECT
> order_id -> order_id AS  order_id,
> TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')) AS last_log_time
> FROM orders
> GROUP BY order_id -> order_id
> HAVING ((UNIX_TIMESTAMP(CONVERT_TZ(FROM_UNIXTIME(UNIX_TIMESTAMP()), 'UTC', 'Asia/Seoul')) - UNIX_TIMESTAMP(TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')))) / 1000 > 600)
> EMIT CHANGES;

이런식으로 id별로 마지막 로그 시간이 오고 10분 이상이 지나면 table에 담기도록 만들었습니다

처음에 이미 10분이 지난 데이터를 넣으면 table에도 들어가고 topic에도 잘 들어가는데

현재시간의 데이터를 넣고 10분이 지나면 table에는 들어가는데 topic에는 들어가지 않습니다

table에도 담기고 topic에도 담기려면 어떻게 해야하나요? 아니면 원래 불가능한건가요?
기반한 stream은 데이터를 넣으면 곧 바로 stream과 토픽에 잘 들어갑니다.

|ORDER_ID                                  |CALCULATED_TIME                           |LAST_LOG_TIME                             |
+------------------------------------------+------------------------------------------+------------------------------------------+
|1                                         |78088                                     |2024-06-16T12:30:00.000                   |
|2                                         |69988                                     |2024-06-16T14:45:00.000                   |
|3                                         |72088                                     |2024-06-16T14:10:00.000                   |
|4                                         |32739088                                  |2023-06-04T12:00:00.000                   |
|5                                         |32637088                                  |2023-06-05T16:20:00.000                   |
|6                                         |32567788                                  |2023-06-06T11:35:00.000                   |
|7                                         |69058                                     |2024-06-16T15:00:30.000                   |
|8                                         |68698                                     |2024-06-16T15:06:30.000                   |
|9                                         |66958                                     |2024-06-16T15:35:30.000                   |
|10                                        |65698                                     |2024-06-16T15:56:30.000                   |
|11                                        |66298                                     |2024-06-16T15:46:30.000                   |
|12                                        |4258                                      |2024-06-17T09:00:30.000                   |
|13                                        |3418                                      |2024-06-17T09:14:30.000                   |
|14                                        |1918                                      |2024-06-17T09:39:30.000                   |
|15                                        |2429                                      |2024-06-17T09:30:59.000                   |
Query terminated
ksql> print result7777;
Key format: AVRO or KAFKA_STRING
Value format: AVRO
rowtime: 2024/06/16 04:23:23.878 Z, key: 1, value: {"CALCULATED_TIME": 12183, "LAST_LOG_TIME": 1718541000000}, partition: 0
rowtime: 2024/06/16 04:23:23.879 Z, key: 2, value: {"CALCULATED_TIME": 4083, "LAST_LOG_TIME": 1718549100000}, partition: 0
rowtime: 2024/06/16 05:10:08.498 Z, key: 3, value: {"CALCULATED_TIME": 6183, "LAST_LOG_TIME": 1718547000000}, partition: 0
rowtime: 2024/06/16 06:06:52.365 Z, key: 4, value: {"CALCULATED_TIME": 32673183, "LAST_LOG_TIME": 1685880000000}, partition: 0
rowtime: 2024/06/16 06:06:52.373 Z, key: 5, value: {"CALCULATED_TIME": 32571183, "LAST_LOG_TIME": 1685982000000}, partition: 0
rowtime: 2024/06/16 06:06:52.377 Z, key: 6, value: {"CALCULATED_TIME": 32501883, "LAST_LOG_TIME": 1686051300000}, partition: 0
rowtime: 2024/06/16 06:09:36.530 Z, key: 7, value: {"CALCULATED_TIME": 3153, "LAST_LOG_TIME": 1718550030000}, partition: 0
rowtime: 2024/06/16 06:15:08.351 Z, key: 8, value: {"CALCULATED_TIME": 2793, "LAST_LOG_TIME": 1718550390000}, partition: 0
rowtime: 2024/06/16 06:41:28.920 Z, key: 9, value: {"CALCULATED_TIME": 1053, "LAST_LOG_TIME": 1718552130000}, partition: 0
rowtime: 2024/06/17 00:23:09.442 Z, key: 12, value: {"CALCULATED_TIME": 1372, "LAST_LOG_TIME": 1718614830000}, partition: 0

1-9, 12 이미 10분이 지난 데이터 // 그 외 = 데이터가 mysql에 담기고 10분이 지나 table에 담긴 데이터

kafka 데이터-엔지니어링 ksqldb

Answer 1

0

dooleyz3525

안녕하십니까,

먼저 질문 내용을 명확히 하고 싶습니다.

  1. mysql에 debezium source connector로 topic에 가져온 데이터를 받는 stream을 만들고

그 stream을 기반으로 하여

=> 그 stream 명이 orders 인가요? 일단 orders 라고 가정하고

  1. 처음에 이미 10분이 지난 데이터를 넣으면 table에도 들어가고 topic에도 잘 들어가는데

    현재시간의 데이터를 넣고 10분이 지나면 table에는 들어가는데 topic에는 들어가지 않습니다

=> CREATE TABLE timeout WITH (KAFKA_TOPIC='timeout' ,....)

로 Table명 timeout을 만드셨는데, KAFKA_TOPIC 명도 timeout 으로 만드셨습니다. 이 topic에 데이터가 안들어 간다는 건가요? 헷갈릴 수 있으니까 일단 kafka_topic명을 다른걸로 변경해 보시고 이 topic에 데이터가 들어가는지 확인해 보십시요.

그런데 위와 같이 하면 해당 topic에 데이터가 들어가야 합니다. table에 데이터가 만들어지면서 동시에 kafka_topic에 write가 되어야 보여지게 됩니다. 다시 한번 확인 부탁드립니다.

그리고 두번째 캡처 이미지에 print result7777 이라고 되어 있는데, 이건 어떤 topic인건지요?

위 내용 다시 확인 부탁드립니다.

 

감사합니다.

 

0

kkmdevel

죄송합니다 급하게 글을 쓰느라 정확하지 못하였습니다.
다시 정리하자면 source connector로 받아서 넣은 원본 stream은 raw_orders입니다.

select * from raw_orders;
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|ORDER_ID            |ORDER_DATETIME      |CUSTOMER_ID         |ORDER_STATUS        |PRICE               |STORE_ID            |
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|{ORDER_ID=1}        |2024-06-17T00:00:00Z|1001                |Completed           |59.99               |1                   |
|{ORDER_ID=2}        |2024-06-17T01:15:00Z|1002                |Pending             |29.99               |2                   |
|{ORDER_ID=3}        |2024-06-17T02:30:00Z|1003                |Cancelled           |19.99               |3                   |
|{ORDER_ID=4}        |2024-06-17T06:30:00Z|1004                |Cancelled           |190.99              |3                   |
|{ORDER_ID=5}        |2024-06-16T06:30:00Z|1004                |Cancelled           |190.99              |3                   |
|{ORDER_ID=6}        |2024-06-16T06:30:00Z|1006                |yes                 |1190.99             |6                   |
|{ORDER_ID=7}        |2024-06-18T00:20:30Z|1007                |yes                 |1190.99             |7                   |

총 7개의 데이터를 넣었습니다.

이 stream을 기반으로 하여 10분이 지나면 table로 가져오게끔 만들었습니다

CREATE TABLE timeout WITH (KAFKA_TOPIC='timeout' , KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=1) AS
> SELECT
> order_id -> order_id AS  order_id,
> TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')) AS last_log_time
> FROM raw_orders
> GROUP BY order_id -> order_id
> HAVING ((UNIX_TIMESTAMP(CONVERT_TZ(FROM_UNIXTIME(UNIX_TIMESTAMP()), 'UTC', 'Asia/Seoul')) - UNIX_TIMESTAMP(TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')))) / 1000 > 600)
> EMIT CHANGES;

1-6번까지의 데이터는 이미 설정해둔 시간이 지나서 table에는 담겨있습니다.

 select * from timeout;
+----------------------------------------------------------------+----------------------------------------------------------------+
|ORDER_ID                                                        |LAST_LOG_TIME                                                   |
+----------------------------------------------------------------+----------------------------------------------------------------+
|1                                                               |2024-06-17T09:00:00.000                                         |
|2                                                               |2024-06-17T10:15:00.000                                         |
|3                                                               |2024-06-17T11:30:00.000                                         |
|4                                                               |2024-06-17T15:30:00.000                                         |
|5                                                               |2024-06-16T15:30:00.000                                         |
|6                                                               |2024-06-16T15:30:00.000                                         |

하지만 7번 데이터가 들어오고 10분이 지나서 table에는 7번 데이터가 담기는데
topic에는 담기지 않습니다.

select * from timeout;
+----------------------------------------------------------------+----------------------------------------------------------------+
|ORDER_ID                                                        |LAST_LOG_TIME                                                   |
+----------------------------------------------------------------+----------------------------------------------------------------+
|1                                                               |2024-06-17T09:00:00.000                                         |
|2                                                               |2024-06-17T10:15:00.000                                         |
|3                                                               |2024-06-17T11:30:00.000                                         |
|4                                                               |2024-06-17T15:30:00.000                                         |
|5                                                               |2024-06-16T15:30:00.000                                         |
|6                                                               |2024-06-16T15:30:00.000                                         |
|7                                                               |2024-06-18T09:20:30.000                                         |
Query terminated
ksql> print timeout;
Key format: AVRO or KAFKA_STRING
Value format: AVRO
rowtime: 2024/06/17 01:51:02.907 Z, key: 1, value: {"LAST_LOG_TIME": 1718614800000}, partition: 0
rowtime: 2024/06/17 01:51:02.922 Z, key: 2, value: {"LAST_LOG_TIME": 1718619300000}, partition: 0
rowtime: 2024/06/17 02:19:38.295 Z, key: 5, value: {"LAST_LOG_TIME": 1718551800000}, partition: 0
rowtime: 2024/06/18 00:24:06.884 Z, key: 6, value: {"LAST_LOG_TIME": 1718551800000}, partition: 0
^CTopic printing ceased

참고로 3,4번도 같은 방식으로 진행했지만 담기지 않았습니다.

어떻게하면 좋을까요?

0

dooleyz3525

원본 stream 명은 raw_orders인데, 아래 CTAS를 보면 SELECT를 orders에서 하는데, orders는 어떤 거인지요?

Create table timeout

as

select ...

from orders

0

kkmdevel

죄송합니다 오타가 있었습니다 raw_orders에서 받아온게 맞습니다

0

kkmdevel

질문글에 오타입니다. 진행은 raw_orders로 했습니다. 시간이 지나고 table에는 담기지만
topic에는 담기지않습니다..

0

dooleyz3525

음, 이건 원인을 정확히 모르겠군요.

먼저 아래와 같이

  1. 토픽명을 바꿔 보셨나요? 헷갈릴수 있으니 토픽명을 일단 변경해 보시지요. 그리고

  1. CTAS에서 Having절을 삭제해 보시고, 다시 한번 테스트 해보시지요.

  2. 1번이 마찬가지면 일단 consumer로 데이터를 확인해 보시지요.

     

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 토픽명 --from-beginning

Ksqldb.io 가 confluent.io 로 리다이렉션 되요

0

49

2

복합키 디코딩 오류 질문드립니다.

0

75

2

Table의 상태(Stateful) 관리 질문

0

102

2

섹션 10 관련 강의자료

0

182

2

Compact Topic에 대하여

0

206

2

시스템 타임존과 카프카 Stream 타임존이 불일치합니다.

0

143

1

ksql DB 서버를 올릴떄 아래와 같이 에러 발생

0

147

2

스트림, 테이블 생성시 데이터 관련 문의

0

216

1

푸시 쿼리 종료 방법에 대해 문의 드립니다.

0

195

1

debezium에서 ksqldb로

0

254

2

CLI로 실행과 코드로 실행하면 결과가 다르게 나옵니다

1

177

1

ksqldb는 workbench처럼 ui는 없을까요?

0

309

2

Pull 쿼리 제약에 대한 이유

0

218

1

IoT Event Streaming 적용에 대해서

0

210

1

[수정요청] Join이해 중 select inner join a.user_id 수정 필요

0

273

2

실무에서 카프카 환경 구축

0

533

2

[수정요청] Mview CSAS 강좌중에 Insert문장 수정 요청

0

248

1

inner join , outer join

0

199

1

group by 리파티션에 대한 질문

0

241

1

ksqldb timestamp 타입 질문

0

245

1

stream format 관련 질문

0

271

1

전통적 분석 시스템 한계에 대해 질문있습니다.

0

277

1

AWS 에서 confluent kafka 와 apache kafka 차이가 궁금 합니다.

0

1688

2

ksqlDB Cluster 여부 - 박성범님 질문(제가 대신해서 적습니다)

0

669

2