Asyn c исследование дерева с помощью RxJava - PullRequest
1 голос
/ 04 апреля 2020

Я пытаюсь построить алгоритм для исследования дерева с помощью Rx Java.

Моя причина использовать Rx Java заключается в том, что моя функция обработки узлов уже использует Rx Java для отправки RPC другим службам. Обратите внимание, что порядок, в котором исследуются различные ветви дерева, не важен.

В идеале я хотел бы иметь что-то вроде QueueFlowable<Node> extends Flowable<Node>, которое предоставило бы функцию push, которую Observers мог бы использовать для добавления новых узлов в очередь после их обработки.

private static main(String[] args) {
  QueueFlowable<Node> nodes = new QueueFlowable<>();

  nodes.push(/* the root */);
  nodes.concatMap(node -> process(node)).map(nodes::push));
  nodes.blockingSubscribe();
}

// Processes the node and returns a Flowable of other nodes to process.
private static Flowable<Node> process(Node node) { ... }

Это звучит как что-то относительно распространенное (как я ожидаю, что веб-сканеры будут реализовывать нечто подобное), но мне все еще не удается заставить его работать.

Моя последняя попытка состояла в том, чтобы используйте PublishProcessor следующим образом:

PublishProcessor<Mode> nodes = PublishProcessor.create();

nodes.concatMap(node -> process(node)).subscribe(nodes);
nodes.onNext(/* the root node */);

nodes.blockingSubscribe();

Конечно, это никогда не завершается, поскольку процессор не завершает работу.

Любая помощь будет высоко ценится!

...