Flux.using не излучает предметы - PullRequest
0 голосов
/ 26 октября 2019

Я пытаюсь создать источник предметов на основе пулей. По какой-то причине после нажатия одного предмета потребитель ничего не получает. Весь трубопровод застрял. Чего мне не хватает?

private static Flux<ByteBuffer> testRun(String path) {
    return Flux.using(() -> {
        FileInputStream in = new FileInputStream(path);
        FileChannel channel = in.getChannel();
        return Tuple.of(in, channel);
    }, t -> s -> {
        ByteBuffer bb = ByteBuffer.allocate(1000);
        try {
            if (t._2.read(bb) > 0)
                s.onNext(bb.rewind());
            else
                s.onComplete();
        } catch (IOException ex) {
            s.onError(ex);
        }
    }, t -> {
        try {
            t._1.close();
            t._2.close();
        } catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    });
}

1 Ответ

0 голосов
/ 26 октября 2019

Я понял. sourceSupplier из using вызывается только один раз.

private static Flux<ByteBuffer> testRun(String path) {

    return Flux.using(() -> {
        final FileInputStream in = new FileInputStream(path);
        final FileChannel channel = in.getChannel();
        return Tuple.of(in, channel);
    }, t -> {
        return Flux.generate(s -> {
            ByteBuffer bb = ByteBuffer.allocate(1000);
            try {
                if (t._2.read(bb) > 0)
                    s.next(bb.rewind());
                else
                    s.complete();
            } catch (IOException ex) {
                s.error(ex);
            }
        });
    }, t -> {
        try {
            t._1.close();
            t._2.close();
        } catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    });
}
...