У меня есть конвейер, который передает потоки DataBuffers для различных типов файлов. Недавно я заметил комбинацию источника и цели, которая полностью блокирует, вызывая тайм-аут. Я сузил его до источника InputStream и цели, которая выполняет InputStream.readAllBytes()
или IOUtils.toByteArray(InputStream)
из Apache -Commons IO.
@Test
public void testReadAllBytesFromInputStream() {
Path p = testResources.resolve("file.txt");
Flux<DataBuffer> buffer = DataBufferUtils.readInputStream(
() -> new FileInputStream(p.toFile()), leakAwareDataBufferFactory, 512);
byte[] bytes = getBytesfromFlux(buffer);
assertTrue( bytes.length > 0);
}
public byte[] getBytesFromFile(Flux<DataBuffer> buffer) throws IOException {
PipedOutputStream osPipe = new PipedOutputStream();
PipedInputStream isPipe = new PipedInputStream(osPipe);
DataBufferUtils.write(source, osPipe)
.onErrorResume(throwable -> {
try {
osPipe.close();
} catch (IOException ioe) {
//nothing
}
return Flux.error(throwable);
}).doOnComplete(() -> {
try {
osPipe.close();
} catch (IOException ioe) {
//nothing
}
}).subscribe(DataBufferUtils.releaseConsumer());
return isPipe.readAllBytes();
}
Простое выполнение блокирующего вызова и объединение их не будет работать, так как это ухудшит производительность потребителей, которые выполняют потоковую передачу правильно. При чтении из FileChannel он не блокируется. Кажется, проблема блокировки между переданными потоками и получателем байтов.