Как обработать все события, испускаемые RX Java, независимо от ошибки? - PullRequest
0 голосов
/ 29 августа 2018

Я использую веб-каркас vertx.io для отправки списка элементов на нижестоящий HTTP-сервер.

records.records() испускает 4 записи, и я специально настроил веб-клиент для подключения к неправильному I.P / порту.

Processing... печатает 4 раза.

Exception outer! 3 раза.

Если я верну правильный порт I.P /, то Susbscribe outer! печатает 4 раза.

io.reactivex.Flowable
    .fromIterable(records.records())
    .flatMap(inRecord -> {
        System.out.println("Processing...");

        // Do stuff here....
        Observable<Buffer> bodyBuffer = Observable.just(Buffer.buffer(...));

        Single<HttpResponse<Buffer>> request = client
          .post(..., ..., ...)
          .rxSendStream(bodyBuffer);

        return request.toFlowable();
    })
    .subscribe(record -> {
        System.out.println("Subscribe outer!");
    }, ex -> {
        System.out.println("Exception outer! " + ex.getMessage());
    });

UPDATE:

Теперь я понимаю, что при ошибке RX останавливается правильно. Есть ли способ продолжить и обработать все записи независимо и получить ошибку для каждой?

Ответы [ 2 ]

0 голосов
/ 31 августа 2018

Учитывая эту статью: https://medium.com/@jagsaund/5-not-so-obvious-things-about-rxjava-c388bd19efbc

Я придумал это ... Разве вы не видите что-то не так с этим?

io.reactivex.Flowable
.fromIterable(records.records())
.flatMap
(inRecord -> {
    Observable<Buffer> bodyBuffer = Observable.just(Buffer.buffer(inRecord.toString()));

    Single<HttpResponse<Buffer>> request = client
            .post("xxxxxx", "xxxxxx", "xxxxxx")
            .rxSendStream(bodyBuffer);

    // So we can capture how long each request took.
    final long startTime = System.currentTimeMillis();

    return request.toFlowable()
        .doOnNext(response -> {
            // Capture total time and print it with the logs. Removed below for brevity.
            long processTimeMs = System.currentTimeMillis() - startTime;

            int status = response.statusCode();

            if(status == 200)
                logger.info("Success!");
            else
                logger.error("Failed!");
        }).doOnError(ex -> {
                long processTimeMs = System.currentTimeMillis() - startTime;

                logger.error("Failed! Exception.", ex);
        }).doOnTerminate(() -> {
          // Do some extra stuff here... 
        }).onErrorResumeNext(Flowable.empty()); // This will allow us to continue.
    }
).subscribe(); // Don't handle here. We subscribe to the inner events.
0 голосов
/ 31 августа 2018

Есть ли способ продолжить и обработать все записи независимо и получить ошибка для каждого?

Согласно doc , наблюдаемое должно быть прекращено, если оно обнаружит ошибку. Таким образом, вы не можете получить каждую ошибку в onError.

Вы можете использовать onErrorReturn или onErrorResumeNext(), чтобы сообщить вышестоящему, что делать, если он обнаружит ошибку (например, emit null или Flowable.empty()).

...