У меня есть код:
return channelRepository.getRssFeedContent(channel.getSourceLink())
.toObservable()
.map(this::parseItem)
.flatMapIterable(xmlItemRawObjects -> xmlItemRawObjects)
.compose(/* question */)
.subscribeOn(Schedulers.from(threadExecutor))
.observeOn(postExecutionThread.getScheduler());
Использование xmlItemRawObjects Мне нужно сделать запрос к базе данных, проверить, существует ли запись. Если записи не существует, верните те же xmlItemRawObjects из compose (), чтобы продолжить работу с ней. Если запись существует в базе данных, убедитесь, что compose () пропускает элемент для потока.
Я попытался создать функцию:
.compose(new ObservableTransformer<XmlItemRawObject, XmlItemRawObject>() {
@Override
public ObservableSource<XmlItemRawObject> apply(Observable<XmlItemRawObject> upstream) {
Boolean isExists = false;
Observable<Item> test = Observable.create(emitter -> {
upstream
.flatMap(xmlItemRawObject -> channelRepository.getItemByUniqueId(xmlItemRawObject.getGuid())
.subscribe(item -> isExists = true));
});
}
})
, но она работает просто отлично. Спасибо за ваш совет.
PS Нет примеров от compose () вообще.
Я пытался решить проблему с помощью flatmap:
.flatMap(new Function<XmlItemRawObject, ObservableSource<XmlItemRawObject>>() {
@Override
public ObservableSource<XmlItemRawObject> apply(XmlItemRawObject xmlItemRawObject) throws Exception {
Observable<XmlItemRawObject> test = Observable.create(emitter -> {
channelRepository.getItemByUniqueId(xmlItemRawObject.getGuid())
.subscribe(
item -> {
emitter.onComplete();
}, throwable -> {}, () -> Observable.just(xmlItemRawObject));
});
return test.defaultIfEmpty(xmlItemRawObject);
}})
Can ' t проверить пустой результат «подзапроса», создать конструкцию if-else для передачи xmlItemRawObject по цепочке