Я пытался найти реактивное решение для следующей проблемы: скажем, у меня есть поток, который содержит элементы, выглядящие так:
class Node {
boolean isParent;
boolean isChild;
int id;
int parentId;
// getters & setters
}
My Flux<Node>
является "бесконечным" и содержит как родителей, так и детей Node
s. Я хочу предупреждать, когда получен родитель, и 3 ребенка не создаются в течение 5 секунд после создания родителя. В первоначальном случае была подписка Kafka, но я хочу знать, как я могу это сделать в более общем случае.
Я пытался использовать windowWhen()
, но он не работает должным образом, и я не знаю, как указать желаемое количество детей.
Flux<Node> myFlux = ... // defined somewhere earlier
myFlux
.windowWhen(
myFlux.filter(node -> node.isParent()),
parent -> myFlux
.filter(node -> node.isChild && node.getParentId() == parent.getId())
.timeout(Duration.ofSeconds(5))
.doOnError(System.out::println))
.subscribe();
Это хорошо компилируется и поражает все элементы (по крайней мере, так кажется из журналов), но когда я пытаюсь ограничить myFlux
с помощью delayElements()
, я получаю только следующее:
17:45:55.961 [main] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
17:45:55.966 [main] INFO reactor.Flux.ConcatMap.1 - request(unbounded)
Я новичок в области реактивного программирования, поэтому я даже не уверен, что моя первоначальная идея повторного использования исходного потока для управления открытием / закрытием окон верна ...