Я использую поток заданных значений, например, Flux<Set<Ingeger>>
, и хочу сопоставить каждый элемент набора с его указанным c потоком, например, Flux<String>
, и объединить их. Но вместо того, чтобы сохранять / отменять весь поток предыдущих отправленных установленных значений, последующие значения должны обрабатываться в соответствии с предыдущим элементом со следующими правилами.
- удаленные элементы завершают свои текущие потоки
- добавлен
Set
элементы создают новые потоки - существующие элементы сохраняют свои потоки
Следующий пример должен продемонстрировать проблему (фрагмент класса Example
).
Flux.just(Set.of(1, 2), Set.of(2), Set.of(1, 3))
.delayElements(Duration.ofMillis(300))
.doOnNext(e -> System.out.println(String.format("Consider %s", e)))
.switchMap(/* correct operator ??? */ set -> Flux.merge(set
.stream()
.map(Example::createNumberStream)
.collect(toList())))
.subscribe(e -> System.out.println("Next element " + e));
Thread.sleep(1200);
Приведенный выше фрагмент кода использует метод createNumberStream
класса Example
для сопоставления элемента Set
с соответствующим ему потоком.
private static Flux<String> createNumberStream(Integer e) {
return Mono.just(e).delayElement(Duration.ofMillis(100)).repeat().index()
.map(t -> String.format("%d (%d)", t.getT2(), t.getT1()));
}
Приведенный выше код должен выдавать вывод ниже (тогда как switchMap
пример оператора не подходит).
Consider [1, 2]
Next element 1 (0)
Next element 2 (0)
Next element 1 (1)
Next element 2 (1)
Consider [2]
Next element 2 (2)
Next element 2 (3)
Consider [1, 3]
Next element 1 (0)
Next element 3 (0)
Next element 3 (1)
Next element 1 (1)
Я рассмотрел flatMap
и switchMap
операторов, которые здесь не подходят. Чтобы сохранить существующие экземпляры, оператор scan
кажется ценным, но я думаю, что после него понадобятся дополнительные операторы.
Как я мог бы получить вышеприведенный результат с преобразованиями потока Flux?