inflearn logo
강의

Course

Instructor

Kafka Complete Guide - ksqlDB

CLI로 실행과 코드로 실행하면 결과가 다르게 나옵니다

Resolved

177

kkmdevel

9 asked

1

CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS INT) AS new_column FROM test_stream EMIT CHANGES;

이렇게 기존 test_stream에서 column을 추가한 add_stream을 만들려고 CLI문을 실행시키면

원래 test_stream에 담겨있는 data가 담아져서 나오는데

package com.example.service;

import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutionException;

@Service
public class streamPracticeAdd {

    @Value("${ksqldb.server.host}")
    private String ksqlDbHost;

    private int ksqlDbPort;

    private Client client;

    @PostConstruct
    public void init() {
        ClientOptions options = ClientOptions.create()
                .setHost(ksqlDbHost)
                .setPort(ksqlDbPort);
        client = Client.create(options);
    }

    public void streamsAdd(String columnName, String dataType) {

        String createStreamKsql = "CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS " + dataType + ") AS " + columnName + " FROM test_stream EMIT CHANGES;";
        try {
            ExecuteStatementResult result = client.executeStatement(createStreamKsql).get();
            System.out.println("Stream created and data inserted into new topic: " + result.queryId());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

/kafka/addColumn/new_column/INT 인 API 요청을 줘서 새 stream을 만드는 코드인데
실행시키면 기존 column에 새 column까지 추가는 되는데 기존 data가 하나도 들어오지 않습니다.

검색을 해봤는데도 잘 안나와서 질문 남깁니다 감사합니다.

kafka 데이터-엔지니어링 ksqldb

Answer 1

0

dooleyz3525

안녕하십니까,

저도 Java API 인터페이스는 잘 모르지만,

streamsAdd() 메소드를 수행하면 add_stream Stream은 만들어지는데, test_stream에 데이타를 입력하면 add_stream에서 데이터가 출력되지 않는다는 건가요? 만약 그렇다면,

  1. streamsAdd()메소드에서 stream 이름을 add_stream이 아니라 add_stream_new로 해서 새롭게 만들어 보시고, 함 테스트 해보시지요. 그리고 CLI에서 add_stream_new select 해서 데이터가 나오는지 확인해 보십시요.

안되면 다시 글 부탁드립니다.

감사합니다.

0

kkmdevel

다시 실행하였더니 수행됐습니다 감사합니다

Ksqldb.io 가 confluent.io 로 리다이렉션 되요

0

49

2

복합키 디코딩 오류 질문드립니다.

0

75

2

Table의 상태(Stateful) 관리 질문

0

102

2

섹션 10 관련 강의자료

0

182

2

Compact Topic에 대하여

0

206

2

시스템 타임존과 카프카 Stream 타임존이 불일치합니다.

0

143

1

ksql DB 서버를 올릴떄 아래와 같이 에러 발생

0

147

2

스트림, 테이블 생성시 데이터 관련 문의

0

216

1

푸시 쿼리 종료 방법에 대해 문의 드립니다.

0

195

1

table의 데이터가 실시간으로 topic에 담기지 않습니다

0

226

1

debezium에서 ksqldb로

0

254

2

ksqldb는 workbench처럼 ui는 없을까요?

0

309

2

Pull 쿼리 제약에 대한 이유

0

218

1

IoT Event Streaming 적용에 대해서

0

210

1

[수정요청] Join이해 중 select inner join a.user_id 수정 필요

0

273

2

실무에서 카프카 환경 구축

0

533

2

[수정요청] Mview CSAS 강좌중에 Insert문장 수정 요청

0

248

1

inner join , outer join

0

199

1

group by 리파티션에 대한 질문

0

241

1

ksqldb timestamp 타입 질문

0

245

1

stream format 관련 질문

0

271

1

전통적 분석 시스템 한계에 대해 질문있습니다.

0

277

1

AWS 에서 confluent kafka 와 apache kafka 차이가 궁금 합니다.

0

1688

2

ksqlDB Cluster 여부 - 박성범님 질문(제가 대신해서 적습니다)

0

669

2