Вы должны взглянуть на Оператор FlatMap
Короче говоря, он превращает каждый из элементов в Observable
в свои Observable
и объединяет их.
Самое простое решение вашей проблемы может быть что-то вроде:
getEventStream()
.flatMap(it -> getSequenceObservable(it))
.doOnNext(System.out::print)
.blockingSubscribe();
Где вспомогательные функции
static Observable<ColoredIntegerModel> getEventStream() {
return Observable.fromArray(
new ColoredIntegerModel(10, Color.RED),
new ColoredIntegerModel(20, Color.RED)
);
}
static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color) {
return Observable.range(1, 10)
.flatMap(it -> Observable.timer(it, TimeUnit.SECONDS)
.map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN))
);
}
Если вы хотите сохранить исходное значение от getEventStream()
, вы можете использовать что-то вроде этого вместо getSequenceObservable
static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color) {
return Observable.range(1, 10)
.flatMap(it -> Observable.timer(it, TimeUnit.MILLISECONDS)
.map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN)))
.concatWith(Observable.just(color));
}
Если важен порядок эмиссии, используйте версию flatMap с maxConcurrency:
getEventStream()
.flatMap(it -> getSequenceObservable(it), true, 1)
.doOnNext(System.out::println)
.blockingSubscribe();