CLI로 실행과 코드로 실행하면 결과가 다르게 나옵니다
CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS INT) AS new_column FROM test_stream EMIT CHANGES;이렇게 기존 test_stream에서 column을 추가한 add_stream을 만들려고 CLI문을 실행시키면
원래 test_stream에 담겨있는 data가 담아져서 나오는데
package com.example.service;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutionException;
@Service
public class streamPracticeAdd {
@Value("${ksqldb.server.host}")
private String ksqlDbHost;
private int ksqlDbPort;
private Client client;
@PostConstruct
public void init() {
ClientOptions options = ClientOptions.create()
.setHost(ksqlDbHost)
.setPort(ksqlDbPort);
client = Client.create(options);
}
public void streamsAdd(String columnName, String dataType) {
String createStreamKsql = "CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS " + dataType + ") AS " + columnName + " FROM test_stream EMIT CHANGES;";
try {
ExecuteStatementResult result = client.executeStatement(createStreamKsql).get();
System.out.println("Stream created and data inserted into new topic: " + result.queryId());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}/kafka/addColumn/new_column/INT 인 API 요청을 줘서 새 stream을 만드는 코드인데
실행시키면 기존 column에 새 column까지 추가는 되는데 기존 data가 하나도 들어오지 않습니다.
검색을 해봤는데도 잘 안나와서 질문 남깁니다 감사합니다.
Answer 1
0
안녕하십니까,
저도 Java API 인터페이스는 잘 모르지만,
streamsAdd() 메소드를 수행하면 add_stream Stream은 만들어지는데, test_stream에 데이타를 입력하면 add_stream에서 데이터가 출력되지 않는다는 건가요? 만약 그렇다면,
streamsAdd()메소드에서 stream 이름을 add_stream이 아니라 add_stream_new로 해서 새롭게 만들어 보시고, 함 테스트 해보시지요. 그리고 CLI에서 add_stream_new select 해서 데이터가 나오는지 확인해 보십시요.
안되면 다시 글 부탁드립니다.
감사합니다.
Ksqldb.io 가 confluent.io 로 리다이렉션 되요
0
49
2
복합키 디코딩 오류 질문드립니다.
0
75
2
Table의 상태(Stateful) 관리 질문
0
102
2
섹션 10 관련 강의자료
0
182
2
Compact Topic에 대하여
0
206
2
시스템 타임존과 카프카 Stream 타임존이 불일치합니다.
0
143
1
ksql DB 서버를 올릴떄 아래와 같이 에러 발생
0
147
2
스트림, 테이블 생성시 데이터 관련 문의
0
216
1
푸시 쿼리 종료 방법에 대해 문의 드립니다.
0
195
1
table의 데이터가 실시간으로 topic에 담기지 않습니다
0
226
1
debezium에서 ksqldb로
0
254
2
ksqldb는 workbench처럼 ui는 없을까요?
0
309
2
Pull 쿼리 제약에 대한 이유
0
218
1
IoT Event Streaming 적용에 대해서
0
210
1
[수정요청] Join이해 중 select inner join a.user_id 수정 필요
0
273
2
실무에서 카프카 환경 구축
0
533
2
[수정요청] Mview CSAS 강좌중에 Insert문장 수정 요청
0
248
1
inner join , outer join
0
199
1
group by 리파티션에 대한 질문
0
241
1
ksqldb timestamp 타입 질문
0
245
1
stream format 관련 질문
0
271
1
전통적 분석 시스템 한계에 대해 질문있습니다.
0
277
1
AWS 에서 confluent kafka 와 apache kafka 차이가 궁금 합니다.
0
1688
2
ksqlDB Cluster 여부 - 박성범님 질문(제가 대신해서 적습니다)
0
669
2

