Буфер N значений в секунду RxJava, Project Reactor - PullRequest
0 голосов
/ 20 октября 2018

У меня есть поток с некоторыми значениями:

Flux<Integer> stream = getStream();

И я пытался реализовать N элементов в секунду

stream.bufferTimeout(MAX_SIZE_TWO, _1_SECOND).subscribe(val => {
  System.out.println(val);
});

Я пытаюсьОператор поиска близок к ожидаемому результату.

Ожидаемый результат:

time: 15:00:00, stream_next_value: 1, output: {1}
time: 15:00:00, stream_next_value: 2, output: {2}
time: 15:00:00, stream_next_value: 3, no output => buffer
time: 15:00:00, stream_next_value: 4, no output => buffer
time: 15:00:00, stream_next_value: 5, no output => buffer
time: 15:00:01, stream_no_next_value, output: {3,4}
time: 15:00:01, stream_next_value: 6, no output => buffer
time: 15:00:02, stream_no_next_value, output: {5,6}

Но похоже, что перегруженные версии оператора буфера не поддерживают это поведение.

Как добитьсяожидаемое поведение с использованием буфера оператора?

1 Ответ

0 голосов
/ 24 октября 2018

Может быть, вы можете сделать это:

Flowable<Long> stream = Flowable.generate(() -> 0L, (next, emitter) -> {
        emitter.onNext(next);
        return next + 1;
});

// Flowable<Long> stream = Flowable.interval(100, MILLISECONDS);
//                                 .onBackpressureDrop(); // to make it works otherwise get a MissingBackPressureException

stream.buffer(2)
      .zipWith(Flowable.interval(1, SECONDS), (first, second) -> first)
      .flatMap(Flowable::fromIterable)
      .subscribe(s -> LOGGER.info("received: " + s),
                 Throwable::printStackTrace);

Остерегайтесь stream, должны учитывать обратное давление, в противном случае вам нужно добавить оператор onBackpressureXXX() (например, это будет иметь место, если поток будетinterval() (см. прокомментированный код)).Вы получаете такой вывод:

14:39:59.538 | INFO  | RxComputationThreadPool-1 | received: 0
14:39:59.540 | INFO  | RxComputationThreadPool-1 | received: 1
14:40:00.527 | INFO  | RxComputationThreadPool-1 | received: 2
14:40:00.528 | INFO  | RxComputationThreadPool-1 | received: 3
14:40:01.528 | INFO  | RxComputationThreadPool-1 | received: 4
14:40:01.528 | INFO  | RxComputationThreadPool-1 | received: 5
14:40:02.528 | INFO  | RxComputationThreadPool-1 | received: 6
14:40:02.528 | INFO  | RxComputationThreadPool-1 | received: 7
14:40:03.528 | INFO  | RxComputationThreadPool-1 | received: 8
14:40:03.528 | INFO  | RxComputationThreadPool-1 | received: 9
...