Kafka Stream - Как отправить оповещение, если в течение определенного промежутка времени для данного ключа не было получено ни одного события - PullRequest
2 голосов
/ 30 мая 2019

Мне нужно отправить предупреждение, если в теме не было получено ни одного события в течение определенного периода времени. Как лучше всего решить этот вариант использования с KafkaStream?

Я пытался:

1) windowedBy вместе с оператором подавления :

    stream
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMillis(1000)).grace(Duration.ZERO))
        .count()
        .suppress(Suppressed.untilWindowCloses(unbounded()))
        .filter((k, v) -> v == 0)
        .toStream()
        .map((windowId, count) -> KeyValue.pair(windowId.key(), AlarmEvent.builder().build()))
        .to(ALARMS, Produced.with(Serdes.String(), AlarmEvent.serde()));

Но, похоже, что окно не закроется, пока не произойдет событие после истечения срока действия, поэтому ни один сигнал тревоги не может быть отправлен точно после определенного времени ожидания.

2) Используя процессор API с пунктатором , похоже, что он работает, но я тестировал только с TopologyTestDriver и advanceWallClockTime () . Не уверен, что advanceWallClockTime () отражает аванс в реальном времени или изменится только при получении события, что приведет к возврату к проблеме в 1).

3) Если пунктуатор работает, я хотел бы использовать его в ValueTranformer , чтобы извлечь выгоду из топологии DSL. Однако я сталкиваюсь с проблемой, описанной в Как переслать событие вниз по течению от экземпляра Punctuator в ValueTransformer? . Невозможно отправить событие ниже по течению от экземпляра пунктуатора.

4) Наконец, у меня возникла идея регулярно вводить фиктивные события (например, каждую секунду) для каждого раздела, чтобы искусственно заставить внутренние часы двигаться вперед. Это позволило бы мне использовать чистое и простое окно DSL и подавлять операторы.

1 Ответ

0 голосов
/ 03 июня 2019

2) Использование процессора API с пунктатором, похоже, работает, но я тестировал только с TopologyTestDriver и advanceWallClockTime (). Не уверен, что advanceWallClockTime () отражает аванс в реальном времени или изменится только при получении события, таким образом, возвращаясь к проблеме в 1).

Это правильный подход. Как видно из названия, знаки препинания могут срабатывать в зависимости от времени настенных часов (т. Е. Системного времени). TopologyTestDriver проверяет время настенных часов для тестирования, но KafkaStreams будет использовать системное время.

3) Если пунктуатор работает, я хотел бы использовать его в ValueTranformer, чтобы извлечь выгоду из топологии DSL. Тем не менее я столкнулся с проблемой, описанной в разделе Как пересылать событие вниз по течению от экземпляра Punctuator в ValueTransformer ?. Невозможно отправить событие ниже по течению от экземпляра пунктуатора.

Вам нужно использовать transform() вместо этого. Передача данных через forward() недопустима в пунктуации ValueTransformer, потому что вы можете испустить любой ключ, нарушив этот контракт неизмененного ключа.

4) Наконец, у меня появилась идея регулярно вводить фиктивные события (например, каждую секунду) для каждого раздела, чтобы искусственно заставить внутренние часы двигаться вперед. Это позволило бы мне использовать чистое и простое окно DSL и подавлять операторы.

Это тоже должно сработать.

...