Как сгруппировать Flux в сегменты и оповещать, когда интервал не достигает определенного размера после определенного периода времени - PullRequest
0 голосов
/ 22 апреля 2019

Я пытался найти реактивное решение для следующей проблемы: скажем, у меня есть поток, который содержит элементы, выглядящие так:

class Node {
    boolean isParent;
    boolean isChild;
    int id;
    int parentId;
    // getters & setters
}

My Flux<Node> является "бесконечным" и содержит как родителей, так и детей Node s. Я хочу предупреждать, когда получен родитель, и 3 ребенка не создаются в течение 5 секунд после создания родителя. В первоначальном случае была подписка Kafka, но я хочу знать, как я могу это сделать в более общем случае.

Я пытался использовать windowWhen(), но он не работает должным образом, и я не знаю, как указать желаемое количество детей.

Flux<Node> myFlux = ... // defined somewhere earlier
myFlux
.windowWhen(
    myFlux.filter(node -> node.isParent()),
    parent -> myFlux
        .filter(node -> node.isChild && node.getParentId() == parent.getId())
        .timeout(Duration.ofSeconds(5))
        .doOnError(System.out::println))
            .subscribe();

Это хорошо компилируется и поражает все элементы (по крайней мере, так кажется из журналов), но когда я пытаюсь ограничить myFlux с помощью delayElements(), я получаю только следующее:

17:45:55.961 [main] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
17:45:55.966 [main] INFO reactor.Flux.ConcatMap.1 - request(unbounded)

Я новичок в области реактивного программирования, поэтому я даже не уверен, что моя первоначальная идея повторного использования исходного потока для управления открытием / закрытием окон верна ...

...