Почему агрегат работает при использовании SessionWindows в Kafka Streams? - PullRequest
0 голосов
/ 14 ноября 2018

Я протестировал простой код, используя SessionWindows в Kafka Streams.
(Версия Spring Cloud Stream: Elmhurst.SR1, версия kafka: 1.0.1.)

@EnableBinding(StreamTestChanel.class)
public class MyKafkaStreamsTest {
   private final Logger logger = LoggerFactory.getLogger(getClass());

   @Value("${window.session.gapSeconds:5}")
   private long gapSeconds;
   @Value("${window.session.durationSeconds:20}")
   private long durationSeconds;

   private SessionWindows sessionWindows = SessionWindows.with(TimeUnit.SECONDS.toMillis(gapSeconds)).until(TimeUnit.SECONDS.toMillis(durationSeconds));

   @StreamListener(StreamTestChanel.CHANEL_INPUT)
   public void process(KStream<String, MyLog> input) {

       input
            .groupByKey()
            .windowedBy(sessionWindows)
            .count() // aggregation
            .toStream() // KTable to KStream
            .foreach((key, count) -> {
                logger.debug("## start --> window time : [{}] - [{}], key : {}({})", DateFormatUtils.format(key.window().start(), "yyyy-MM-dd HH:mm:ss.SSS"), DateFormatUtils.format(key.window().end(), "yyyy-MM-dd HH:mm:ss.SSS"), key.key(), count);
            });
   }
}

После тестирования журнал остается как показано ниже.

2018-11-14 17:22:29 [DEBUG](MyKafkaStreamsTest.java:52) ## start --> window time : [2018-11-14 17:22:24.396] - [2018-11-14 17:22:28.866], key : test15(75)
2018-11-14 17:22:34 [DEBUG](MyKafkaStreamsTest.java:52) ## start --> window time : [2018-11-14 17:22:24.396] - [2018-11-14 17:22:28.866], key : test15(null)
2018-11-14 17:22:44 [DEBUG](MyKafkaStreamsTest.java:52) ## start --> window time : [2018-11-14 17:22:24.421] - [2018-11-14 17:22:38.979], key : test06(null)
2018-11-14 17:22:44 [DEBUG](MyKafkaStreamsTest.java:52) ## start --> window time : [2018-11-14 17:22:24.421] - [2018-11-14 17:22:40.103], key : test06(165)
2018-11-14 17:22:44 [DEBUG](MyKafkaStreamsTest.java:52) ## start --> window time : [2018-11-14 17:22:24.438] - [2018-11-14 17:22:38.992], key : test01(null)
2018-11-14 17:22:45 [DEBUG](MyKafkaStreamsTest.java:52) ## start --> window time : [2018-11-14 17:22:24.438] - [2018-11-14 17:22:42.256], key : test01(165)

https://kafka.apache.org/10/documentation/streams/developer-guide/dsl-api.html#session-windows

Я прочитал это, но я не знаю, почему значение счетчика равно нулю, и время начала и окончания окна перекрываются. Я что-то сделал не так?

пожалуйста, помогите мне.

1 Ответ

0 голосов
/ 15 ноября 2018

Если объединяются два окна сеанса, сначала создается новое объединенное окно, а затем удаляются старые два окна. null s, которые вы видите - это те удаления.

...