Я реализовал свой собственный Transformer
, где я накапливаю события в моем StateStore
(во внутреннюю тему kafka журнала изменений), и как только он накапливает события X, я отправляю одно накопленное событие в поток.
Все идеально, но я не хочу, чтобы накопленные события застревали в канале, если в течение некоторого времени не публикуются новые события.
Вот почему я решил использовать Punctuator:
@Override
public void init(ProcessorContext context) {
this.context = context;
this.kvStore = (KeyValueStore<Patient, Long>) this.context.getStateStore(stateStore);
this.context.schedule(Duration.ofMinutes(180), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
KeyValueIterator<Event, Long> iter = kvStore.all();
while (iter.hasNext()) {
KeyValue<Event, Long> entry = iter.next();
...
}
iter.close();
context.commit();
});
}
Проблема в том, что записи в StateStore
не содержат отправленной информации.Есть ли способ получить к нему доступ?Нужно ли мне реализовать свой собственный StateStore
, чтобы он работал?
Спасибо!