• 카테고리

    질문 & 답변
  • 세부 분야

    데이터 엔지니어링

  • 해결 여부

    미해결

tumbling window 가 close 되는 시점에 로직이 수행되도록 하고 싶습니다.

23.12.03 17:01 작성 조회수 129

0

        final KStream<String, MonitoringClass> ks0 = streamsBuilder.stream(INPUT_TOPIC, Consumed.with(STRING_SERDE, MONITORING_CLASS_SERDE));

        Duration windowDuration = Duration.ofMinutes(2);
        TimeWindows tumblingWindow = TimeWindows.of(windowDuration.toMillis()).grace(Duration.ZERO);

ks0
  .groupByKey()
  .windowedBy(tumblingWindow)
  .count()
  .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
   .toStream() 

안녕하세요, 다음과 같이 suppress() 함수를 사용해서 윈도우가 종료되는 시점에만 특정 로직을 실행하고 싶은데
suppress() 함수를 추가한 후부터는 테스트 실행시 아예 컨슈밍이 안되는 것 처럼 보여서 도움을 얻고자 글 남겼습니다.

해당 부분도 커밋 주기 때문에 예상대로 동작하지 않는 걸까요?

답변 2

·

답변을 작성해보세요.

0

안녕하세요. suppress()의 경우 말씀대로 윈도우가 종료되는 시점에 취합이 완료된 데이터를 추출합니다. 하지만 커밋이 된 이후에 추출이 됩니다. 커밋 interval 이후에 데이터가 추출되는지 확인해보시겠어요?

 

저는 로컬에서 다음과 같은 코드로 정상동작함을 확인했습니다.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> ks0 = builder.stream("test");
Duration windowDuration = Duration.ofSeconds(3);
TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(windowDuration);
ks0.print(Printed.toSysOut());
ks0.groupByKey()
    .windowedBy(tumblingWindow)
    .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>
            as("aggregation").withKeySerde(Serdes.String())
    )
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream().print(Printed.toSysOut());
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

0

인프런 AI 인턴님의 프로필

인프런 AI 인턴

2023.12.05

안녕하세요, 인프런 AI 인턴입니다.
DC님의 질문에 대해 답변드리겠습니다.

먼저, Apache Kafka Streams에서 suppress() 메서드는 윈도우의 종료 시점까지 발생한 이벤트를 억제(suppress)하고, 해당 시점에만 결과를 전달하기 위해 사용됩니다. suppress(Suppressed.untilWindowCloses(...))는 윈도우가 종료될 때까지 결과를 기다렸다가 윈도우가 종료되면 그동안 억제했던 결과를 다운스트림으로 방출하도록 합니다.

하지만, suppress() 함수를 추가한 후에 컨슈머가 동작하지 않는 것 같다면, 몇 가지 확인해볼 사항이 있습니다:

  1. Kafka Streams 애플리케이션의 상태: Streams 애플리케이션이 정상적으로 실행 중인지 확인해보세요. 예상치 못한 예외나 오류로 인해 중단된 상태일 수 있습니다.

  2. 시간과 윈도우 설정: 스트림즈 애플리케이션은 시스템 시간 또는 이벤트 타임에 따라 윈도우를 처리합니다. 시스템의 시간 설정과 tumbling 윈도우의 기간 설정이 정확한지 확인하세요.

  3. 커밋 주기: Kafka 컨슈머의 커밋 주기가 길 경우 지연이 발생할 수 있으므로, 설정을 검토해보세요. auto.commit.interval.ms와 같은 컨슈머 커밋 관련 속성들이 영향을 줄 수 있습니다.

  4. 테스트 환경: 테스트를 진행할 때 Kafka Streams의 내부 시각이 실시간 시각과 동기화되고 있는지 확인하세요. 또한, 테스트 중에 충분한 이벤트가 발생하여 윈도우가 종료되도록 해야 합니다.

  5. 로그 확인: 애플리케이션의 로그를 확인하여 추가적인 정보나 오류 메시지를 파악해보세요.

문제를 해결하기 위해, 위 사항들을 확인하시고 다른 Kafka Streams의 구성 요소들이 정상적으로 동작하고 있는지 먼저 점검해보시기 바랍니다. 더 자세한 정보와 도움을 위해, Spring for Apache Kafka reference manual을 참조하시면 도움이 될 것입니다.

추가적으로 제공해 드린 Spring Boot Reference Documentation의 Kafka Streams 섹션도 참조해 보시길 권장드립니다. Suppress 기능과 관련된 자세한 내용이 필요하시면, 해당 문서의 “Kafka Streams Support” 섹션이 도움이 될 수 있습니다.

문제 해결에 도움이 되길 바랍니다.