Rx Java. Изменить цепочку в составе - PullRequest
0 голосов
/ 10 февраля 2020

У меня есть код:

 return channelRepository.getRssFeedContent(channel.getSourceLink())
                .toObservable()
                .map(this::parseItem)
                .flatMapIterable(xmlItemRawObjects -> xmlItemRawObjects)
                .compose(/* question  */)
                .subscribeOn(Schedulers.from(threadExecutor))
                .observeOn(postExecutionThread.getScheduler());

Использование xmlItemRawObjects Мне нужно сделать запрос к базе данных, проверить, существует ли запись. Если записи не существует, верните те же xmlItemRawObjects из compose (), чтобы продолжить работу с ней. Если запись существует в базе данных, убедитесь, что compose () пропускает элемент для потока.

Я попытался создать функцию:

        .compose(new ObservableTransformer<XmlItemRawObject, XmlItemRawObject>() {
            @Override
            public ObservableSource<XmlItemRawObject> apply(Observable<XmlItemRawObject> upstream) {
               Boolean isExists = false;
               Observable<Item> test = Observable.create(emitter -> {
                   upstream
                           .flatMap(xmlItemRawObject -> channelRepository.getItemByUniqueId(xmlItemRawObject.getGuid())
                           .subscribe(item -> isExists = true));
               });
            }
        })

, но она работает просто отлично. Спасибо за ваш совет.

PS Нет примеров от compose () вообще.

Я пытался решить проблему с помощью flatmap:

        .flatMap(new Function<XmlItemRawObject, ObservableSource<XmlItemRawObject>>() {
            @Override
            public ObservableSource<XmlItemRawObject> apply(XmlItemRawObject xmlItemRawObject) throws Exception {
                                       Observable<XmlItemRawObject> test = Observable.create(emitter -> {
                    channelRepository.getItemByUniqueId(xmlItemRawObject.getGuid())
                            .subscribe(
                                    item -> {
                                        emitter.onComplete();
                                    }, throwable -> {}, () -> Observable.just(xmlItemRawObject));
                });
                return test.defaultIfEmpty(xmlItemRawObject);
            }})

Can ' t проверить пустой результат «подзапроса», создать конструкцию if-else для передачи xmlItemRawObject по цепочке

...