Как получить все выбросы после отката - PullRequest
0 голосов
/ 08 июля 2019

У меня есть наблюдаемая цепочка, которая генерирует данные в группах.

например

1-2-3-4-5 ----------- 1-2-3-5-6 ------------ 5-3-2- 1-9 ------....

каждое число является эмиттером, и мне нужно следующее:

[1,2,3,4,5] ---------- [1,2,3,5,6] --------- [5,3,2 , 1,9] ----....

так что в этом случае вместо 15 испускается целое число. Я получу 3 выпуска массива целых чисел.

Разница между подключенными излучениями обычно составляет менее 100 миллисекунд, а разница между различными группами обычно превышает несколько минут. поэтому я подумал о том, чтобы сделать какую-то комбинацию Debounce и сканирования, чтобы преобразовать выбросы в массив, но я не нашел ничего подходящего для меня.

есть какой-нибудь изящный оператор RxJava2, который помогает мне?

Я сделал следующее

        List<IncomeMessage> messages = new ArrayList<>();
        incomeMessageSubject.doOnNext(messages::add)
                .debounce(1000, TimeUnit.MILLISECONDS)
                .map(incomeMessage -> {
                    List<IncomeMessage> tempMessages = new ArrayList<>(messages);
                    messages.clear();
                    return tempMessages;
                })
                .subscribe();

но мне было интересно, есть ли решение, которое встроено в качестве оператора и не требует наличия списка, которым мне нужно управлять вручную.

1 Ответ

0 голосов
/ 08 июля 2019

Для этого нет встроенного оператора, и у вашего решения есть риск испортить список, потому что doOnNext и map могут работать одновременно из-за debounce.

Вы можете использовать буфер с границей, которая запускается тем же источником с отклонениями:

incomeMessageSubject
.buffer(incomingMessageSubject.debounce(1000, TimeUnit.MILLISECONDS))
.subscribe(/* consume list */);

Если ваш источник холодный, вы должны использовать publish():

source
.publish(shared -> shared.buffer(shared.debounce(1, SECONDS)))
.subscribe(/* consume list */);
...