Как сделать преобразование потока Кафки (map / flatMap) с учетом значений в хранилище ключей / значений? - PullRequest
0 голосов
/ 08 июня 2018

Моя задача заключается в следующем:

Я отслеживаю события синхронизации времени от стороннего измерительного устройства.Эта временная синхронизация немного ненадежна, поэтому я хочу определить, когда синхронизация останавливается, и выдать сигнал тревоги.

Для этого я создаю события синхронизации в теме Кафки.У меня три разных события:

  1. Запрос синхронизации
  2. Синхронизация выполнена успешно
  3. Ошибка синхронизации, так как другое устройство не отвечает

Итак, что я хочу сделать:

  • Когда запрос получен, и по истечении определенного времени ничего не получено, я хочу подать сигнал тревоги «тайм-аут»
  • Когда запросПолучено, и в течение периода ожидания наступает событие успеха. Я хочу выдать «тайм-аут», если по истечении времени ожидания запрос не поступает
  • Когда приходит событие сбоя, я хочу выпустить «другое устройство».не ответил "alarm

В настоящее время я нахожусь в процессе настройки приложения Kafka-Streams, и мне нужно сохранить состояние в случае сбоя этого приложения (не должно, но я хочу бытьконечно), поэтому я настроил это следующим образом:

val builder = new StreamsBuilder
val storeBuilder = Stores.
  keyValueStoreBuilder(Stores.persistentKeyValueStore("timesync-alarms"),
                       Serdes.String(),
                       logEntrySerde)
builder.addStateStore(storeBuilder)
val eventStream = builder.stream(sourceTopic, Consumed.`with`(Serdes.String(), logEntrySerde))

Теперь я застрял.То, что я в основном думаю, что мне нужно сделать, имеет функцию flatMap для eventStream, которая всякий раз, когда происходит событие:

  1. Запрашивает хранилище для последнего обработанного события
  2. Решает, должен ли быть подан сигнал тревоги
  3. Обновляет магазин с полученным в данный момент событием
  4. Создает сигнал тревоги, если таковой имеется

Итак, как мне достичьшаги 1 и 3 здесь?Или я концептуально не прав и должен делать это по-другому?

1 Ответ

0 голосов
/ 10 июня 2018

Я думаю, вам не нужно напрямую использовать State Store.Вы можете создать два потока - один с событиями запроса синхронизации, второй с ответами синхронизации (успех, сбой) и присоединиться к ним:

requestStream.outerJoin(responseStream, (leftVal, rightVal) -> ...,
    JoinWindows.of(timeout), ...);

В случае таймаута rightVal равен нулю.

Если вы хотите отправить сигналы тревоги в отдельную тему, вы можете просто отфильтровать объединенный поток и записать все ошибки (ответы на ошибки и тайм-ауты) в тему.В противном случае вы можете использовать метод peek() и вызвать какое-либо действие внутри.Вот простой пример: https://github.com/djarza/football-events/blob/master/football-ui/src/main/java/org/djar/football/ui/projection/StatisticsPublisher.java

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...