Kafka Streams Punctuator проверяет время отправки записи в StateStore - PullRequest
0 голосов
/ 26 апреля 2019

Я реализовал свой собственный Transformer, где я накапливаю события в моем StateStore (во внутреннюю тему kafka журнала изменений), и как только он накапливает события X, я отправляю одно накопленное событие в поток.

Все идеально, но я не хочу, чтобы накопленные события застревали в канале, если в течение некоторого времени не публикуются новые события.

Вот почему я решил использовать Punctuator:

@Override
public void init(ProcessorContext context) {
    this.context = context;
    this.kvStore = (KeyValueStore<Patient, Long>) this.context.getStateStore(stateStore);
    this.context.schedule(Duration.ofMinutes(180), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
      KeyValueIterator<Event, Long> iter = kvStore.all();
      while (iter.hasNext()) {
        KeyValue<Event, Long> entry = iter.next();
        ...
      }
      iter.close();
      context.commit();
    });

}

Проблема в том, что записи в StateStore не содержат отправленной информации.Есть ли способ получить к нему доступ?Нужно ли мне реализовать свой собственный StateStore, чтобы он работал?

Спасибо!

...