RxJava3: debounce / пропустить нежелательные «0» в потоке целых чисел, но хотят «0» только при непрерывном получении - PullRequest
2 голосов
/ 21 января 2020

У меня есть Observable типа Integer, который испускает 0 и положительные числа, и Observer, чтобы поймать оба.

Observable испускает непрерывные нули или непрерывные положительные числа каждые 10 миллисекунд ( Один элемент на время ).

Как и 0,0,0,0,0,0,0,5,5,6,7,8,8,9,10,11,4,5,6,5,0,0,0,0,0... [ожидается]

Также он выдает ноль между положительными числами (редко).

Как и 0,0,0,0,0,0,0,5,5,6,7,8,8,**0**,9,10,11,4,5,6,5,0,0,0,0,0... [неожиданно]

Я хочу отменить / пропустить этот ноль между двумя положительными числами, но хочу поймать 0, если он непрерывный.

Существует ли комбинация операторов для достижения этой цели в rx java. Заранее спасибо.

Код выглядит примерно так:

public Observer<Integer> valueObserver = new DisposingObserver<Integer>() {
        @Override
        public void onNext(Integer value) {
         //every 10 seconds a value is received
         //do action based on zero or non-zero values.
        }
    };

Observable<Integer> sourceObservable = Observable.just(0,0,0,0,0,0,0,5,5,6,7,8,8,0,9,10,11,4,5,6,5,0,0,0,0,0,...);
sourceObservable.subscribe(valueObserver);

ожидаемый o / p: 0,5,6,7,8,9,10,11,4,5, 6,5,0

Последовательные повторяющиеся элементы, которые я мог бы удалить с помощью оператора DifferentUntilChanged.

1 Ответ

2 голосов
/ 22 января 2020

Это требует запоминания того, сколько последующих нулей поток создал до сих пор и ничего не испускает, один ноль, два нуля или ненулевой элемент. Вы можете сделать это с помощью flatMapIterable, но подсчет нулей выполняется с учетом состояния, поэтому требуется defer:

static ObservableTransformer<Integer, Integer> skipSingleZero() {
    return source -> Observable.defer(() -> {
        AtomicInteger zerosSeen = new AtomicInteger();

        return source
        .flatMapIterable(item -> {
            if (item == 0) {
                int n = zerosSeen.getAndIncrement();
                // first zero we saw, don't emit it
                if (n == 0) {
                    return Collections.emptyList();
                }
                // the second zero in a row, it means a streak so emit both
                if (n == 1) {
                    return Arrays.asList(0, 0);
                }
                // third or more zeros, just emit those now on
                return Collections.singletonList(0);
            }
            // a non-zero item, reset the counter
            zerosSeen.set(0);
            return Collections.singletonList(item);
        });
    });
}

Это последовательность с одним одиноким нулем в середине, поэтому ее следует пропустить:

Observable<Integer> source = Observable.fromArray(
        0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
        0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);

source
.compose(skipSingleZero())
.test()
.assertResult(
        0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
        9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);

Это имеет 2 полосы в середине и должно быть сохранено:

Observable<Integer> source = Observable.fromArray(
        0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
        0, 0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);
source
.compose(skipSingleZero())
.test()
.assertResult(
        0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
        0, 0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);
...