Как найти недоступные объекты в couchbase при использовании RxJava? - PullRequest
0 голосов
/ 22 марта 2019

Требование: Массовое чтение группы объектов из базы. Даже если один объект недоступен, выдается исключение с сообщением о том, что следующие объекты недоступны.

Подход: Мы используем RxJava и asyncBucket для чтения этих объектов из БД.

Код:

        final List<String> failedToReadOrders = new ArrayList<>();
        final List<Order> list = Observable
                .from(ids)
                .subscribeOn(Schedulers.io())
                .flatMap(id -> asyncBucket
                        .get(id)
                        .doOnError(ex-> {
                            failedToReadOrders.add(id);
                            LOGGER.error("Error occured while reading order with ID key={}",id);
                            })
                        .retryWhen(retryFunc())
                        .onErrorResumeNext(Observable.empty()))
                .map(doc-> couchbaseConversionService.convertJsonToJavaObject(doc.content(), Order.class))
                .toList()
                .toBlocking()
                .single();

Метод doOnError вызывается только в том случае, если их исключение выдается методом asyncBucket.get (). Однако, если элемент недоступен, то в документации asyncBucket.get () говорится, что «Если документ не найден, Observable завершается без отправленного элемента».

Вопрос: Конечно, я могу узнать, какие элементы не были прочитаны, найдя разницу между списком идентификаторов для чтения и возвращенными объектами. Однако возможно ли собрать эти идентификаторы в самом коде выше, чтобы мне больше не пришлось повторять цикл в списке?

1 Ответ

1 голос
/ 28 марта 2019
List<Order> list = Observable
    .from(ids)
    .subscribeOn(Schedulers.io())
    .flatMap(id -> asyncBucket
        .get(id)
        .switchIfEmpty(Observable.error(new Throwable("No data found")))
        .doOnError(ex-> {
            failedToReadOrders.add(id);
            LOGGER.error("Error occured while reading order with ID key={}",id);
        })
        .retryWhen(retryFunc())
        .onErrorResumeNext(Observable.empty()))
    .map(doc-> couchbaseConversionService.convertJsonToJavaObject(doc.content(), Order.class))
    .toList()
    .toBlocking()
    .single();
...