Это требует запоминания того, сколько последующих нулей поток создал до сих пор и ничего не испускает, один ноль, два нуля или ненулевой элемент. Вы можете сделать это с помощью flatMapIterable
, но подсчет нулей выполняется с учетом состояния, поэтому требуется defer
:
static ObservableTransformer<Integer, Integer> skipSingleZero() {
return source -> Observable.defer(() -> {
AtomicInteger zerosSeen = new AtomicInteger();
return source
.flatMapIterable(item -> {
if (item == 0) {
int n = zerosSeen.getAndIncrement();
// first zero we saw, don't emit it
if (n == 0) {
return Collections.emptyList();
}
// the second zero in a row, it means a streak so emit both
if (n == 1) {
return Arrays.asList(0, 0);
}
// third or more zeros, just emit those now on
return Collections.singletonList(0);
}
// a non-zero item, reset the counter
zerosSeen.set(0);
return Collections.singletonList(item);
});
});
}
Это последовательность с одним одиноким нулем в середине, поэтому ее следует пропустить:
Observable<Integer> source = Observable.fromArray(
0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);
source
.compose(skipSingleZero())
.test()
.assertResult(
0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);
Это имеет 2 полосы в середине и должно быть сохранено:
Observable<Integer> source = Observable.fromArray(
0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
0, 0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);
source
.compose(skipSingleZero())
.test()
.assertResult(
0, 0, 0, 0, 0, 0, 0, 5, 5, 6, 7, 8, 8,
0, 0, 9, 10, 11, 4, 5, 6, 5, 0, 0, 0, 0, 0
);