Flux.fromStream терпит неудачу при разделении и объединении - PullRequest
1 голос
/ 08 апреля 2019

У меня есть этот пример кода:

Flux<Integer> range = Flux.range(0, 10);
Flux<Long> longs = Flux.fromStream(new Random().longs(100, 500).boxed()); // (1)
// Flux<Long> longs = Flux.fromIterable(new Random().longs(100, 500).boxed().limit(30).collect(Collectors.toList())); // (2)

Flux<Tuple2<Integer, Long>> flux1 = Flux.zip(range, longs);

Flux<Integer> flux2 = flux1.map(e -> 2);
Flux<Integer> flux3 = flux1.map(e -> 3);

CountDownLatch countDownLatch = new CountDownLatch(1);

Flux.merge(flux2, flux3)
   .doOnComplete(() -> countDownLatch.countDown())
   .subscribe(e -> log.info("{}", e));

countDownLatch.await(1, TimeUnit.MINUTES);

Это не с:

Caused by: java.lang.IllegalStateException: stream has already been operated upon or closed
  at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:343)
  at java.util.stream.ReferencePipeline.iterator(ReferencePipeline.java:139)
  at reactor.core.publisher.FluxStream.subscribe(FluxStream.java:57)
  at reactor.core.publisher.Flux.subscribe(Flux.java:7777)
  at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:579)
  ...

Строка комментария (1) и строка комментария (2) решают проблему, но в моем случае использования longs не ограничен, как в (1). Как бы это исправить?

Реальный вариант использования - сделать что-то, когда оба flux2 и flux3 сделаны, у них есть побочные эффекты в вызовах map - запись в файл в этом случае, поэтому мне нужно убедиться, что все написано прежде чем я уйду.

Ответы [ 2 ]

0 голосов
/ 18 апреля 2019

Вы можете использовать defer:

Flux<Long> longs = Flux.defer(() -> Flux.fromStream(new Random().longs(100, 500).boxed()));

Даже если у вас есть только один явный subscribe, вы создаете более одного потока на основе longs. Подписка создается для каждого.

Flux.fromStream может быть подписан только один раз, потому что потоки Java могут использоваться только один раз.

defer решает эту проблему, создавая новый поток для каждого подписчика.

0 голосов
/ 09 апреля 2019

Flux по своему дизайну можно использовать повторно, однако в строке (2) вы используете Flux.fromStream потребляющие потоки Java (которые можно использовать только один раз) - вот почему вы получаете stream has already been operated upon or closed.

Одним из решений будет дублирование потока longs с тем же начальным значением для Random.

long seed = 1000000;

Flux<Long> longs = Flux.fromStream(new Random(seed).longs(100, 500).boxed());      
Flux<Long> longs1 = Flux.fromStream(new Random(seed).longs(100, 500).boxed()); 

Flux<Integer> flux2 = Flux.zip(range, longs).map(e -> 2);
Flux<Integer> flux3 = Flux.zip(range, longs1).map(e -> 3);
...