tumbling window 가 close 되는 시점에 로직이 수행되도록 하고 싶습니다.
431
작성한 질문수 4
final KStream<String, MonitoringClass> ks0 = streamsBuilder.stream(INPUT_TOPIC, Consumed.with(STRING_SERDE, MONITORING_CLASS_SERDE));
Duration windowDuration = Duration.ofMinutes(2);
TimeWindows tumblingWindow = TimeWindows.of(windowDuration.toMillis()).grace(Duration.ZERO);
ks0
.groupByKey()
.windowedBy(tumblingWindow)
.count()
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream() 안녕하세요, 다음과 같이 suppress() 함수를 사용해서 윈도우가 종료되는 시점에만 특정 로직을 실행하고 싶은데
suppress() 함수를 추가한 후부터는 테스트 실행시 아예 컨슈밍이 안되는 것 처럼 보여서 도움을 얻고자 글 남겼습니다.
해당 부분도 커밋 주기 때문에 예상대로 동작하지 않는 걸까요?
답변 2
0
안녕하세요. suppress()의 경우 말씀대로 윈도우가 종료되는 시점에 취합이 완료된 데이터를 추출합니다. 하지만 커밋이 된 이후에 추출이 됩니다. 커밋 interval 이후에 데이터가 추출되는지 확인해보시겠어요?
저는 로컬에서 다음과 같은 코드로 정상동작함을 확인했습니다.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> ks0 = builder.stream("test");
Duration windowDuration = Duration.ofSeconds(3);
TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(windowDuration);
ks0.print(Printed.toSysOut());
ks0.groupByKey()
.windowedBy(tumblingWindow)
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>
as("aggregation").withKeySerde(Serdes.String())
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream().print(Printed.toSysOut());
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
0
안녕하세요, 인프런 AI 인턴입니다.
DC님의 질문에 대해 답변드리겠습니다.
먼저, Apache Kafka Streams에서 suppress() 메서드는 윈도우의 종료 시점까지 발생한 이벤트를 억제(suppress)하고, 해당 시점에만 결과를 전달하기 위해 사용됩니다. suppress(Suppressed.untilWindowCloses(...))는 윈도우가 종료될 때까지 결과를 기다렸다가 윈도우가 종료되면 그동안 억제했던 결과를 다운스트림으로 방출하도록 합니다.
하지만, suppress() 함수를 추가한 후에 컨슈머가 동작하지 않는 것 같다면, 몇 가지 확인해볼 사항이 있습니다:
Kafka Streams 애플리케이션의 상태: Streams 애플리케이션이 정상적으로 실행 중인지 확인해보세요. 예상치 못한 예외나 오류로 인해 중단된 상태일 수 있습니다.
시간과 윈도우 설정: 스트림즈 애플리케이션은 시스템 시간 또는 이벤트 타임에 따라 윈도우를 처리합니다. 시스템의 시간 설정과 tumbling 윈도우의 기간 설정이 정확한지 확인하세요.
커밋 주기: Kafka 컨슈머의 커밋 주기가 길 경우 지연이 발생할 수 있으므로, 설정을 검토해보세요.
auto.commit.interval.ms와 같은 컨슈머 커밋 관련 속성들이 영향을 줄 수 있습니다.테스트 환경: 테스트를 진행할 때 Kafka Streams의 내부 시각이 실시간 시각과 동기화되고 있는지 확인하세요. 또한, 테스트 중에 충분한 이벤트가 발생하여 윈도우가 종료되도록 해야 합니다.
로그 확인: 애플리케이션의 로그를 확인하여 추가적인 정보나 오류 메시지를 파악해보세요.
문제를 해결하기 위해, 위 사항들을 확인하시고 다른 Kafka Streams의 구성 요소들이 정상적으로 동작하고 있는지 먼저 점검해보시기 바랍니다. 더 자세한 정보와 도움을 위해, Spring for Apache Kafka reference manual을 참조하시면 도움이 될 것입니다.
추가적으로 제공해 드린 Spring Boot Reference Documentation의 Kafka Streams 섹션도 참조해 보시길 권장드립니다. Suppress 기능과 관련된 자세한 내용이 필요하시면, 해당 문서의 “Kafka Streams Support” 섹션이 도움이 될 수 있습니다.
문제 해결에 도움이 되길 바랍니다.
consume 이후 lag가 줄어들지 않음
0
75
2
안녕하세요. 강의의 카프카 버전과 현재 시점의 카프카의 차이점 문의 드립니다.
0
118
2
멱등성 프로듀서 retries 관련 질문입니다.
0
99
2
채팅 서비스 개발 시 주의점이 있을까요?
0
100
2
충분히 큰 파티션 생성시 궁금증이 존재합니다.
0
88
2
KTable 키가 없는 레코드 처리
0
73
2
컨슈머 테스트 코드 작성
0
94
2
리밸런스 onPartitionRevoked이 필요한 상황
0
74
2
카프카 클러스터에서 감당 가능한 파티션(레플리카) 수 문의
0
120
2
reset offset 질문
0
77
2
KStreamJoinKTable 실행시 오류
0
78
2
auto.commit.interval.ms 옵션 관련 질문 드립니다.
0
142
2
파티션, 컨슈머 그룹, 컨슈머 관련 질문
0
117
1
java, kotlin
0
130
2
shutdownThread 에 대한 문의 입니다.
0
155
2
zookeeper실행시 오류가 발생합니다.
0
271
2
커스텀 소스 커넥터에서 Thread.sleep (1000) 은 왜 하는거에요?
0
153
2
처리량을 늘리기 위해서 파티션을 늘리고 컨슈머를 늘려야한다고 설명하셨는데요
0
181
3
파티션 개수와 컨슈머 개수의 처리량 관련 질문
0
164
2
동영상 및 이미지 처리 관련 문의 드립니다.
0
226
2
주키퍼 없이 사용 문의 드립니다.
0
303
1
kafka 를 띄우니 오류가 발생하고 종료합니다.
0
281
3
zookeeper 실행시 오류 .. 무엇을 해야 할까요?
0
220
2
파티션 추가로 해결할 수 있지만 늘어난 파티션은 줄일 수 없지 않나요?
0
192
2





