Попытка чтения Flux <DataBuffer>в InputStream с использованием потоковых блоков ввода / вывода и тайм-аута - PullRequest
0 голосов
/ 04 мая 2020

У меня есть конвейер, который передает потоки DataBuffers для различных типов файлов. Недавно я заметил комбинацию источника и цели, которая полностью блокирует, вызывая тайм-аут. Я сузил его до источника InputStream и цели, которая выполняет InputStream.readAllBytes() или IOUtils.toByteArray(InputStream) из Apache -Commons IO.

@Test
public void testReadAllBytesFromInputStream() {
        Path p = testResources.resolve("file.txt");
        Flux<DataBuffer> buffer = DataBufferUtils.readInputStream(
                () -> new FileInputStream(p.toFile()), leakAwareDataBufferFactory, 512);
        byte[] bytes = getBytesfromFlux(buffer);
        assertTrue( bytes.length > 0);
}

public byte[] getBytesFromFile(Flux<DataBuffer> buffer) throws IOException {
        PipedOutputStream osPipe = new PipedOutputStream();
        PipedInputStream isPipe = new PipedInputStream(osPipe);
        DataBufferUtils.write(source, osPipe)
            .onErrorResume(throwable -> {
                try {
                    osPipe.close();
                } catch (IOException ioe) {
                    //nothing
                }
                return Flux.error(throwable);
            }).doOnComplete(() -> {
                try {
                    osPipe.close();
                } catch (IOException ioe) {
                    //nothing
                }
        }).subscribe(DataBufferUtils.releaseConsumer());
        return isPipe.readAllBytes();
}

Простое выполнение блокирующего вызова и объединение их не будет работать, так как это ухудшит производительность потребителей, которые выполняют потоковую передачу правильно. При чтении из FileChannel он не блокируется. Кажется, проблема блокировки между переданными потоками и получателем байтов.

...