Я пытаюсь построить алгоритм для исследования дерева с помощью Rx Java.
Моя причина использовать Rx Java заключается в том, что моя функция обработки узлов уже использует Rx Java для отправки RPC другим службам. Обратите внимание, что порядок, в котором исследуются различные ветви дерева, не важен.
В идеале я хотел бы иметь что-то вроде QueueFlowable<Node> extends Flowable<Node>
, которое предоставило бы функцию push
, которую Observers
мог бы использовать для добавления новых узлов в очередь после их обработки.
private static main(String[] args) {
QueueFlowable<Node> nodes = new QueueFlowable<>();
nodes.push(/* the root */);
nodes.concatMap(node -> process(node)).map(nodes::push));
nodes.blockingSubscribe();
}
// Processes the node and returns a Flowable of other nodes to process.
private static Flowable<Node> process(Node node) { ... }
Это звучит как что-то относительно распространенное (как я ожидаю, что веб-сканеры будут реализовывать нечто подобное), но мне все еще не удается заставить его работать.
Моя последняя попытка состояла в том, чтобы используйте PublishProcessor
следующим образом:
PublishProcessor<Mode> nodes = PublishProcessor.create();
nodes.concatMap(node -> process(node)).subscribe(nodes);
nodes.onNext(/* the root node */);
nodes.blockingSubscribe();
Конечно, это никогда не завершается, поскольку процессор не завершает работу.
Любая помощь будет высоко ценится!