Я использую веб-каркас vertx.io для отправки списка элементов на нижестоящий HTTP-сервер.
records.records()
испускает 4 записи, и я специально настроил веб-клиент для подключения к неправильному I.P / порту.
Processing...
печатает 4 раза.
Exception outer!
3 раза.
Если я верну правильный порт I.P /, то Susbscribe outer!
печатает 4 раза.
io.reactivex.Flowable
.fromIterable(records.records())
.flatMap(inRecord -> {
System.out.println("Processing...");
// Do stuff here....
Observable<Buffer> bodyBuffer = Observable.just(Buffer.buffer(...));
Single<HttpResponse<Buffer>> request = client
.post(..., ..., ...)
.rxSendStream(bodyBuffer);
return request.toFlowable();
})
.subscribe(record -> {
System.out.println("Subscribe outer!");
}, ex -> {
System.out.println("Exception outer! " + ex.getMessage());
});
UPDATE:
Теперь я понимаю, что при ошибке RX останавливается правильно. Есть ли способ продолжить и обработать все записи независимо и получить ошибку для каждой?