Допустим, у меня есть Flux
следующих объектов
class A {
int id;
boolean isValid;
}
Всякий раз, когда испускается новый объект, я хочу запустить фиксированный таймер (например, 3 секунды) и буферизовать каждый следующий объект с таким же id , который прибудет в этот промежуток времени и испустит максимум один из них (Mono<A>
) (на основании некоторых вычислений)
Я обнаружил комбинацию groupBy
и buffer
операторы. Примерно так:
flux
.groupBy(A::getId)
.flatMap(g -> g
.buffer(Duration.ofSeconds(3))
.map(this::pickMaxOne) //or flatMapFromIterable(this::pickMaxOne)
.flatMap(mono -> mono) //instead those 2 lines
);
Дело в том, что groupBy
хорошо работает только для низкой мощности группы . В моем случае у меня может быть до 1 миллиона обновлений с разными идентификаторами и около 5k одновременно (в течение 3 секунд). Какие у меня есть альтернативы?
Кстати. buffer
не работает точно так, как я хочу. Он запускает таймер 3 с один раз для самого первого элемента. Затем последующие периоды фиксируются и не зависят от элемента (независимо от того, генерируется ли новый элемент, есть планировщик, который тикает каждые 3 с). Любые предложения приветствуются.