RxJava Добавление событий в поток на основе обратного вызова - PullRequest
0 голосов
/ 08 сентября 2018

Добавление некоторого кода, чтобы прояснить вопрос

//generates a sequence in the range from input value (+1) to input value (+9)
Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color) {
        return Observable.range(value+1,9)
                .map(i -> {
                    Log.d(TAG, "Value " + i
                            + " evaluating on " + Thread.currentThread().getName()
                            + " emitting item at " + System.currentTimeMillis());
                    try {
                        Thread.sleep(delay);
                    } catch (InterruptedException e) {

                    }
                    return new ColoredIntegerModel(i, color);
                });
    }

//creates a stream if say input =2 of numbers from 1-20 (input*2) such that the output is 1 (Red color) 2-10(green color) 11 (Red color) 11-20 (Green Color) 
    Observable<ColoredIntegerModel> getEventStream(int value) {
        return Observable.create(new ObservableOnSubscribe<ColoredIntegerModel>() {
            @Override
            public void subscribe(ObservableEmitter<ColoredIntegerModel> emitter) throws Exception {
                for (int i = 0; i < value; ++i) {
                    ColoredIntegerModel model = new ColoredIntegerModel(i*10, Color.RED);
                    emitter.onNext(model);
                    Observable<ColoredIntegerModel> more = getSequenceObservable(i*10, 100, Color.GREEN);
                    more.subscribe(new Consumer<ColoredIntegerModel>() {
                        @Override
                        public void accept(ColoredIntegerModel coloredIntegerModel) throws Exception {
                            emitter.onNext(coloredIntegerModel);
                        }
                    });
                }
            }
        });
    }

Код выше работает. Он печатает 1 (красный), 2-10 (зеленый), 11 (красный), 12-20, но я бы хотел более чистое решение. Я также не уверен, когда внутренняя подписка в getEventStream () может быть удалена.

Вопрос в основном в том, что getEventStream вызывает функцию для каждого излучения, которое также возвращает Observable. Это аналогично цепочке обещаний, в которой каждое отдельное обещание может возвращать ряд других обещаний. Надеюсь, что это проясняет любую путаницу в исходном вопросе.

Ответы [ 2 ]

0 голосов
/ 12 сентября 2018

Вы должны взглянуть на Оператор FlatMap

Короче говоря, он превращает каждый из элементов в Observable в свои Observable и объединяет их.

Самое простое решение вашей проблемы может быть что-то вроде:

getEventStream()
    .flatMap(it -> getSequenceObservable(it))
    .doOnNext(System.out::print)
    .blockingSubscribe();

Где вспомогательные функции

static Observable<ColoredIntegerModel> getEventStream() {
    return Observable.fromArray(
            new ColoredIntegerModel(10, Color.RED),
            new ColoredIntegerModel(20, Color.RED)
    );
}
static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color) {
    return Observable.range(1, 10)
            .flatMap(it -> Observable.timer(it, TimeUnit.SECONDS)
                .map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN))
            );
}

Если вы хотите сохранить исходное значение от getEventStream(), вы можете использовать что-то вроде этого вместо getSequenceObservable

static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color) {
     return Observable.range(1, 10)
             .flatMap(it -> Observable.timer(it, TimeUnit.MILLISECONDS)
                    .map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN)))
             .concatWith(Observable.just(color));
}

Если важен порядок эмиссии, используйте версию flatMap с maxConcurrency:

getEventStream()
            .flatMap(it -> getSequenceObservable(it), true, 1)
            .doOnNext(System.out::println)
            .blockingSubscribe();
0 голосов
/ 10 сентября 2018

Если вам нужно упростить приведенный выше код, чтобы все обработки подписок оставались на усмотрение конечного подписчика и поддерживался порядок передачи подпоследовательностей, вы можете сделать это следующим образом:

Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color) {
    return Observable.range(value+1, 9)
            .flatMap( i -> Observable
                    .just(new ColoredIntegerModel(i, color))
                    .delay(delay * (i + 1), TimeUnit.MILLISECONDS)
            )
            ;
}

Observable<ColoredIntegerModel> getEventStream(int value) {
    return Observable.range(0, value)
            .concatMap(i ->
                    getSequenceObservable(i * 10,100, Color.GREEN)
                            .startWith(new ColoredIntegerModel(i*10, Color.RED))
            )
            ;
}

то есть, если вам действительно нужна ручная задержка, если нет, просто замените getSequenceObservable на:

Observable<ColoredIntegerModel> getSequenceObservable(int value, int color) {
    return Observable.range(value+1, 9)
            .map(i -> new ColoredIntegerModel(i, color))
            ;
}
...