Flux.groupBy альтернатива для большого количества групп - PullRequest
0 голосов
/ 26 февраля 2020

Допустим, у меня есть Flux следующих объектов

class A {
    int id;
    boolean isValid;
}

Всякий раз, когда испускается новый объект, я хочу запустить фиксированный таймер (например, 3 секунды) и буферизовать каждый следующий объект с таким же id , который прибудет в этот промежуток времени и испустит максимум один из них (Mono<A>) (на основании некоторых вычислений)

Я обнаружил комбинацию groupBy и buffer операторы. Примерно так:

flux
   .groupBy(A::getId)
   .flatMap(g -> g
         .buffer(Duration.ofSeconds(3))
         .map(this::pickMaxOne) //or flatMapFromIterable(this::pickMaxOne)
         .flatMap(mono -> mono) //instead those 2 lines
   );

Дело в том, что groupBy хорошо работает только для низкой мощности группы . В моем случае у меня может быть до 1 миллиона обновлений с разными идентификаторами и около 5k одновременно (в течение 3 секунд). Какие у меня есть альтернативы?

Кстати. buffer не работает точно так, как я хочу. Он запускает таймер 3 с один раз для самого первого элемента. Затем последующие периоды фиксируются и не зависят от элемента (независимо от того, генерируется ли новый элемент, есть планировщик, который тикает каждые 3 с). Любые предложения приветствуются.

...