То, что я в итоге сделал, это:
/**
* Start loading the Twilio chat channel pages
*
* @return Single containing a list of all the chat channel pages
*/
public Single<List<Paginator<ChannelDescriptor>>> loadAllChatChannelPages(ChatClientManager chatClientManager) {
return Single.create(emitter -> chatClientManager.getChatClient().getChannels().getUserChannelsList(new CallbackListener<Paginator<ChannelDescriptor>>() {
@Override
public void onSuccess(Paginator<ChannelDescriptor> channelDescriptorPaginator) {
if(channelDescriptorPaginator != null) {
emitter.onSuccess(channelDescriptorPaginator);
}
}
@Override
public void onError(ErrorInfo errorInfo) {
String errorMessage = "";
if(errorInfo != null) {
errorMessage = errorInfo.getMessage();
}
emitter.onError(new Throwable(errorMessage));
}
})).subscribeOn(Schedulers.io())
.flatMap(firstPaginator -> {
if(firstPaginator != null) {
return loadChannelPaginator((Paginator<ChannelDescriptor>) firstPaginator).toList()
.subscribeOn(Schedulers.io());
} else {
return Single.error(new Throwable("Could not get chat channels"));
}
});
}
/**
* Recursively loads the chat channel pages and returns them as a single observable
*
* @param paginator this needs to be the first chat channel paginator from the chat client
* @return Observable containing a flattened version of all the available chat channel paginators
*/
private Observable<Paginator<ChannelDescriptor>> loadChannelPaginator(Paginator<ChannelDescriptor> paginator) {
if (paginator.hasNextPage()) {
return Observable.mergeDelayError(
Observable.just(paginator),
loadNextChatChannelPage(paginator)
.flatMap(this::loadChannelPaginator));
}
return Observable.just(paginator);
}
/**
* Loads a single chat channel page
*
* @param previousPage the previous page of chat channels
* @return Observable containing the next chat channel page
*/
private Observable<Paginator<ChannelDescriptor>> loadNextChatChannelPage(Paginator<ChannelDescriptor> previousPage) {
return Observable.create(emitter -> previousPage.requestNextPage(new CallbackListener<Paginator<ChannelDescriptor>>() {
@Override
public void onSuccess(Paginator<ChannelDescriptor> channelDescriptorPaginator) {
if(channelDescriptorPaginator != null) {
emitter.onNext(channelDescriptorPaginator);
}
emitter.onComplete();
}
@Override
public void onError(ErrorInfo errorInfo) {
if(errorInfo != null) {
String errorMessage = errorInfo.getMessage();
Timber.e(errorMessage);
}
// emitter.onError(new Throwable(errorMessage));
emitter.onComplete();
}
}));
}
В приведенном выше коде loadAllChatChannelPages загружает первый paginator.
Если это не ноль, то loadChannelPaginator вступает во владение и рекурсивно захватывает каждый следующий paginatorвыполняя loadNextChatChannelPage, метод, который возвращает наблюдаемое для каждого отдельного пагинатора.
Затем mergeDelayError сглаживает все пагинаторы и возвращает их как один отдельный наблюдаемый.
Наконец, в getAllChannels я применяю Observable.toList (), это возвращает сингл, содержащий список нужных мне каналов чата.