Отображение заданных значений потоков Flux в поток относительно последнего набора - PullRequest
0 голосов
/ 05 марта 2020

Я использую поток заданных значений, например, Flux<Set<Ingeger>>, и хочу сопоставить каждый элемент набора с его указанным c потоком, например, Flux<String>, и объединить их. Но вместо того, чтобы сохранять / отменять весь поток предыдущих отправленных установленных значений, последующие значения должны обрабатываться в соответствии с предыдущим элементом со следующими правилами.

  • удаленные элементы завершают свои текущие потоки
  • добавлен Set элементы создают новые потоки
  • существующие элементы сохраняют свои потоки

Следующий пример должен продемонстрировать проблему (фрагмент класса Example).

Flux.just(Set.of(1, 2), Set.of(2), Set.of(1, 3))
        .delayElements(Duration.ofMillis(300))
        .doOnNext(e -> System.out.println(String.format("Consider %s", e)))
        .switchMap(/* correct operator ??? */ set -> Flux.merge(set
                .stream()
                .map(Example::createNumberStream)
                .collect(toList())))
        .subscribe(e -> System.out.println("Next element " + e));

Thread.sleep(1200);

Приведенный выше фрагмент кода использует метод createNumberStream класса Example для сопоставления элемента Set с соответствующим ему потоком.

private static Flux<String> createNumberStream(Integer e) {
    return Mono.just(e).delayElement(Duration.ofMillis(100)).repeat().index()
            .map(t -> String.format("%d (%d)", t.getT2(), t.getT1()));
}

Приведенный выше код должен выдавать вывод ниже (тогда как switchMap пример оператора не подходит).

Consider [1, 2]
Next element 1 (0)
Next element 2 (0)
Next element 1 (1)
Next element 2 (1)
Consider [2]
Next element 2 (2)
Next element 2 (3)
Consider [1, 3]
Next element 1 (0)
Next element 3 (0)
Next element 3 (1)
Next element 1 (1)

Я рассмотрел flatMap и switchMap операторов, которые здесь не подходят. Чтобы сохранить существующие экземпляры, оператор scan кажется ценным, но я думаю, что после него понадобятся дополнительные операторы.

Как я мог бы получить вышеприведенный результат с преобразованиями потока Flux?

...