Я пытаюсь создать источник предметов на основе пулей. По какой-то причине после нажатия одного предмета потребитель ничего не получает. Весь трубопровод застрял. Чего мне не хватает?
private static Flux<ByteBuffer> testRun(String path) {
return Flux.using(() -> {
FileInputStream in = new FileInputStream(path);
FileChannel channel = in.getChannel();
return Tuple.of(in, channel);
}, t -> s -> {
ByteBuffer bb = ByteBuffer.allocate(1000);
try {
if (t._2.read(bb) > 0)
s.onNext(bb.rewind());
else
s.onComplete();
} catch (IOException ex) {
s.onError(ex);
}
}, t -> {
try {
t._1.close();
t._2.close();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
});
}