Моя задача заключается в следующем:
Я отслеживаю события синхронизации времени от стороннего измерительного устройства.Эта временная синхронизация немного ненадежна, поэтому я хочу определить, когда синхронизация останавливается, и выдать сигнал тревоги.
Для этого я создаю события синхронизации в теме Кафки.У меня три разных события:
- Запрос синхронизации
- Синхронизация выполнена успешно
- Ошибка синхронизации, так как другое устройство не отвечает
Итак, что я хочу сделать:
- Когда запрос получен, и по истечении определенного времени ничего не получено, я хочу подать сигнал тревоги «тайм-аут»
- Когда запросПолучено, и в течение периода ожидания наступает событие успеха. Я хочу выдать «тайм-аут», если по истечении времени ожидания запрос не поступает
- Когда приходит событие сбоя, я хочу выпустить «другое устройство».не ответил "alarm
В настоящее время я нахожусь в процессе настройки приложения Kafka-Streams, и мне нужно сохранить состояние в случае сбоя этого приложения (не должно, но я хочу бытьконечно), поэтому я настроил это следующим образом:
val builder = new StreamsBuilder
val storeBuilder = Stores.
keyValueStoreBuilder(Stores.persistentKeyValueStore("timesync-alarms"),
Serdes.String(),
logEntrySerde)
builder.addStateStore(storeBuilder)
val eventStream = builder.stream(sourceTopic, Consumed.`with`(Serdes.String(), logEntrySerde))
Теперь я застрял.То, что я в основном думаю, что мне нужно сделать, имеет функцию flatMap
для eventStream, которая всякий раз, когда происходит событие:
- Запрашивает хранилище для последнего обработанного события
- Решает, должен ли быть подан сигнал тревоги
- Обновляет магазин с полученным в данный момент событием
- Создает сигнал тревоги, если таковой имеется
Итак, как мне достичьшаги 1 и 3 здесь?Или я концептуально не прав и должен делать это по-другому?