Как остановить RXJava 2 от выдачи большего количества ошибок после первой? - PullRequest
0 голосов
/ 03 апреля 2019

с использованием RX Java 2.2.6.

Я понимаю, что, когда выдается одна ошибка, полный поток останавливается.Но если выдается несколько ошибок, я получаю ...

The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with.

Так что в следующей ситуации моя наблюдаемая создается из потока файлов, который отправляет строку за строкой.Когда экземпляр myService (службы http), например, не работает, выдается несколько ошибок.Как несколько строк были переданы на службу.Таким образом, если файл имел 10 строк, первая ошибка завершит наблюдаемую, но тогда будет выпущена еще одна 9-ти ошибка, но в это время наблюдаемая закрыта, и я хочу, чтобы она закрылась.

Должен ли я переключить myService наОдиночный, а не Завершаемый?

На данный момент myService основан на обработчике и объясняет, почему я создаю оболочку Completable.

observable
  .skipLast(1)
  .map(buffer -> new JsonArray(buffer))
  .flatMap(record ->
    Completable.create(emitter -> {
      if (record.size() > 13) {
        myService.send(..., writeResult -> {
          if (writeResult.succeeded()) {
            emitter.onComplete();
          } else {
            emitter.onError(writeResult.cause());
          }
        });
      } else {
        emitter.onError(new IllegalArgumentException("Record contains invalid amount of elements. Found: " + record.size() + " elements."));
      }
    }).toObservable())
  .doOnComplete(() -> {
    resultHandler.handle(Future.succeededFuture(successCount.longValue()));
  }).subscribe(
  o -> {
  },
  t -> {
    resultHandler.handle(Future.failedFuture(t));
  });

Обновление: Проверка, еслипохоже, что излучатель добился цели?

observable
.skipLast(1)
.map(buffer -> new JsonArray(buffer.getDelegate()))
.flatMap(record ->
  Completable.create(emitter -> {

          myService.send(..., writeResult -> {
            if (writeResult.succeeded()) {
              successCount.getAndIncrement();

              if (!emitter.isDisposed())
                emitter.onComplete();
            } else {
              if (!emitter.isDisposed())
                emitter.onError(writeResult.cause());
            }
          });
        } else {
          if(!emitter.isDisposed())
            emitter.onError(new IllegalArgumentException("Invalid topic."));
        }
  }).toObservable())
.doOnComplete(() -> {
  resultHandler.handle(Future.succeededFuture(successCount.longValue()));
}).subscribe(
o -> {
  // Do nothing
},
t ->
  resultHandler.handle(Future.failedFuture(new IllegalStateException("Failed at line: " + successCount.get(), t)))
);

1 Ответ

0 голосов
/ 11 апреля 2019

Я верю flatMapCompletable сделает свое дело:

fun observeRecordings(): Observable<Unit> =
        Observable.just(20)
                .skipLast(1)
                .map { JsonArray(it) }
                .filter { it.size() > 13 }
                .flatMapCompletable { service.send(it) }
                .doOnError { }
                .toObservable()

Извините за использование Kotlin!

...