У меня есть этот пример кода:
Flux<Integer> range = Flux.range(0, 10);
Flux<Long> longs = Flux.fromStream(new Random().longs(100, 500).boxed()); // (1)
// Flux<Long> longs = Flux.fromIterable(new Random().longs(100, 500).boxed().limit(30).collect(Collectors.toList())); // (2)
Flux<Tuple2<Integer, Long>> flux1 = Flux.zip(range, longs);
Flux<Integer> flux2 = flux1.map(e -> 2);
Flux<Integer> flux3 = flux1.map(e -> 3);
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.merge(flux2, flux3)
.doOnComplete(() -> countDownLatch.countDown())
.subscribe(e -> log.info("{}", e));
countDownLatch.await(1, TimeUnit.MINUTES);
Это не с:
Caused by: java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:343)
at java.util.stream.ReferencePipeline.iterator(ReferencePipeline.java:139)
at reactor.core.publisher.FluxStream.subscribe(FluxStream.java:57)
at reactor.core.publisher.Flux.subscribe(Flux.java:7777)
at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:579)
...
Строка комментария (1) и строка комментария (2) решают проблему, но в моем случае использования longs
не ограничен, как в (1). Как бы это исправить?
Реальный вариант использования - сделать что-то, когда оба flux2
и flux3
сделаны, у них есть побочные эффекты в вызовах map
- запись в файл в этом случае, поэтому мне нужно убедиться, что все написано прежде чем я уйду.