У меня есть Observable<Person>
и метод из внешней библиотеки sendByStream(InputStream inputStream)
Я хочу передать свой выходной поток
final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);
с помощью моего наблюдаемого и отправить его этому внешнему методу.Я не знаю, как сделать это неблокирующим способом, так как sendByStream(InputStream inputStream)
является блокирующим.
Я пытаюсь преобразовать мой Observable
в Completable
, записать в поток в doOnNext
и использоватьметод doOnCompleted
для закрытия потоков и их отправки, но в результате сначала выполняется полное заполнение канала, а затем отправка его методом.
Должен ли я открыть другой поток для метода sendByStream
, или есть способ сделать этоэто в стиле rxJava?
Что у меня сейчас (очень наивный импл, но работает):
try {
PipedOutputStream pipedOutputStream = new PipedOutputStream();
PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);
new Thread(() -> {
facade.sendByStream(pipedInputStream);
try {;
pipedInputStream.close();
} catch(final IOException e) {
e.printStackTrace();
}
}).start();
return persons.
.doOnNext(p -> {
try {
pipedOutputStream.write(p.getBytes());
} catch(final IOException e) {
LOGGER.error("Problem with writing to output stream");
}
})
.toCompletable()
.doOnCompleted(() -> {
try {
LOGGER.info("Closing output stream");
pipedOutputStream.close();
LOGGER.info("output stream closed");
} catch(IOException e) {
LOGGER.error("Problem with closing output stream");
}
})
.doOnCompleted(() -> LOGGER.info("Sending file completed"));
} catch(final IOException e) {
return Completable.error(RuntimeException::new);
}