RxJava - отправка объектов через входной поток - PullRequest
0 голосов
/ 19 октября 2018

У меня есть Observable<Person> и метод из внешней библиотеки sendByStream(InputStream inputStream)

Я хочу передать свой выходной поток

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

с помощью моего наблюдаемого и отправить его этому внешнему методу.Я не знаю, как сделать это неблокирующим способом, так как sendByStream(InputStream inputStream) является блокирующим.

Я пытаюсь преобразовать мой Observable в Completable, записать в поток в doOnNext и использоватьметод doOnCompleted для закрытия потоков и их отправки, но в результате сначала выполняется полное заполнение канала, а затем отправка его методом.

Должен ли я открыть другой поток для метода sendByStream, или есть способ сделать этоэто в стиле rxJava?

Что у меня сейчас (очень наивный импл, но работает):

try {
    PipedOutputStream pipedOutputStream = new PipedOutputStream();
    PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);
    new Thread(() -> {
        facade.sendByStream(pipedInputStream);
        try {;
            pipedInputStream.close();
        } catch(final IOException e) {
            e.printStackTrace();
        }
    }).start();

    return persons.
        .doOnNext(p -> {
            try {
                pipedOutputStream.write(p.getBytes());
            } catch(final IOException e) {
                LOGGER.error("Problem with writing to output stream");
            }
        })
        .toCompletable()
        .doOnCompleted(() -> {
            try {
                LOGGER.info("Closing output stream");
                pipedOutputStream.close();
                LOGGER.info("output stream closed");
            } catch(IOException e) {
                LOGGER.error("Problem with closing output stream");
            }
        })
        .doOnCompleted(() -> LOGGER.info("Sending file completed"));
} catch(final IOException e) {
    return Completable.error(RuntimeException::new);
}
...