Kafka Streams, используя Session State Store с процессором Api - PullRequest
0 голосов
/ 19 ноября 2018

Как вы используете хранилище состояний сессии с процессором API?Кажется, вам нужно знать конец окна (который неизвестен) во время обработки записи.Чтобы положить в магазин ключ, это Windowed, который состоит из ключа и окна.Окно инстанцируется временем начала и окончания.Откуда ты знаешь время окончания?Время окончания будет определено, когда в деятельности будет промежуток, который известен только после обработки записи (и, возможно, еще больше).Вот как я ожидал, что это сработает.

@Override
public void init(ProcessorContext processorContext) {
    this.context = processorContext;
    store = ((SessionStore) context.getStateStore("sessionstore"));
}

@Override
public void process(String key, MyValueClass value) {
   //I would expect something like
   store.put(key,value,timestamp)
}
...