Реактор: разверните ParallelFlux - PullRequest
0 голосов
/ 04 февраля 2019

У меня есть коллекция предметов, которые необходимо расширить, поэтому я выбрал реактор для его реактивных возможностей, поскольку расширение требует операций ввода-вывода.

Вот фрагмент рабочего кода:

public Flux<Item> expand(List<Item> unprocessedItems) {
  return Flux.fromIterable(unprocessedItems)
    .expandDeep(this::expandItem);
}

Обратите внимание, что this::expandItem является операцией блокировки (несколько запросов к базе данных, некоторые вычисления, ...).Теперь я хотел бы, чтобы это расширение было параллельным, но, насколько я знаю, .expand() и .expandDeep() являются только членами класса Flux, а не класса ParallelFlux.Я попытался добавить .publishOn() и .subscribeOn() перед вызовом .expand(), но без удачи.

Я впервые использую реактор, но я не вижу никаких технических проблем, препятствующих параллельному расширению, есть ли способ?сделать это?API отсутствует или я что-то упустил?

1 Ответ

0 голосов
/ 04 февраля 2019

да, вы правы ParallelFlux не имеет методов .expand() и .expandDeep(), но я могу использовать другой способ, создать дополнительный Publisher с методом расширения и передать его в ParallelFlux, например:

public static void main(String[] args) {      

    Function<Node, Flux<Node>> expander =
        node -> Flux.fromIterable(node.children);

    List<Node> roots = createTestNodes();

    Flux.fromIterable(roots)
        .parallel(4)
        .runOn(Schedulers.parallel())
        .flatMap(node -> Flux.just(node).expandDeep(expander))
        .doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i))
        .sequential()
        .subscribe();

    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("finished");

}

Мои данные испытаний:

static final class Node {
    final String name;
    final List<Node> children;

    Node(String name, Node... nodes) {
        this.name = name;
        this.children = new ArrayList<>();
        children.addAll(Arrays.asList(nodes));
    }

    @Override
    public String toString() {
        return name;
    }
}

static List<Node> createTestNodes() {
    return new Node("root",
        new Node("1",
            new Node("11")
        ),
        new Node("2",
            new Node("21"),
            new Node("22",
                new Node("221")
            )
        ),
        new Node("3",
            new Node("31"),
            new Node("32",
                new Node("321")
            ),
            new Node("33",
                new Node("331"),
                new Node("332",
                    new Node("3321")
                )
            )
        ),
        new Node("4",
            new Node("41"),
            new Node("42",
                new Node("421")
            ),
            new Node("43",
                new Node("431"),
                new Node("432",
                    new Node("4321")
                )
            ),
            new Node("44",
                new Node("441"),
                new Node("442",
                    new Node("4421")
                ),
                new Node("443",
                    new Node("4431"),
                    new Node("4432")
                )
            )
        )
    ).children;
}

И результат:

Time: 1549296674522 thread: parallel-4 value: 4
Time: 1549296674523 thread: parallel-4 value: 41
Time: 1549296674523 thread: parallel-2 value: 2
Time: 1549296674523 thread: parallel-2 value: 21
Time: 1549296674523 thread: parallel-3 value: 3
Time: 1549296674523 thread: parallel-3 value: 31
Time: 1549296674523 thread: parallel-1 value: 1
Time: 1549296674523 thread: parallel-1 value: 11
Time: 1549296674525 thread: parallel-2 value: 22
Time: 1549296674525 thread: parallel-2 value: 221
Time: 1549296674526 thread: parallel-3 value: 32
Time: 1549296674526 thread: parallel-3 value: 321
Time: 1549296674526 thread: parallel-3 value: 33
Time: 1549296674526 thread: parallel-3 value: 331
Time: 1549296674526 thread: parallel-3 value: 332
Time: 1549296674526 thread: parallel-3 value: 3321
Time: 1549296674526 thread: parallel-4 value: 42
Time: 1549296674526 thread: parallel-4 value: 421
Time: 1549296674526 thread: parallel-4 value: 43
Time: 1549296674526 thread: parallel-4 value: 431
Time: 1549296674526 thread: parallel-4 value: 432
Time: 1549296674526 thread: parallel-4 value: 4321
Time: 1549296674527 thread: parallel-4 value: 44
Time: 1549296674527 thread: parallel-4 value: 441
Time: 1549296674527 thread: parallel-4 value: 442
Time: 1549296674527 thread: parallel-4 value: 4421
Time: 1549296674528 thread: parallel-4 value: 443
Time: 1549296674528 thread: parallel-4 value: 4431
Time: 1549296674528 thread: parallel-4 value: 4432

Как видите, expander работал в параллельных потоках.

...