Могу ли я получить полную последовательность в Rx вместо того, чтобы получать только последний объект, о котором идет речь? - PullRequest
0 голосов
/ 07 мая 2018

enter image description here

Я хочу знать, был ли один из поврежденных объектов зеленым шаром.Фильтрация только зеленых шариков до или после отказов приводит к некорректному поведению.

Ответы [ 3 ]

0 голосов
/ 07 мая 2018

Это можно сделать с помощью нетривиального набора операторов и побочного воздействия на поток путем введения дополнительных каналов:

import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.*;
import io.reactivex.subjects.PublishSubject;

public class DebounceTimeDrop {

    @Test
    public void test() {
        PublishSubject<Integer> source = PublishSubject.create();

        TestScheduler scheduler = new TestScheduler();

        source.compose(debounceTime(10, TimeUnit.MILLISECONDS, scheduler, v -> {
            System.out.println(
                    "Dropped: " + v + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS));
        }))
        .subscribe(v -> System.out.println(
                "Passed: " + v + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS)),
                Throwable::printStackTrace, 
                () -> System.out.println(
                        "Done "  + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS)));

        source.onNext(1);
        scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);

        scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);

        source.onNext(2);
        scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
        source.onNext(3);
        scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
        source.onNext(4);
        scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
        source.onNext(5);
        scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);

        scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);

        source.onNext(6);
        scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);

        scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);

        source.onComplete();
    }

    public static <T> ObservableTransformer<T, T> debounceTime(
            long time, TimeUnit unit, Scheduler scheduler, 
            Consumer<? super T> dropped) {
        return o -> Observable.<T>defer(() -> {
            AtomicLong index = new AtomicLong();
            Queue<Timed<T>> queue = new ConcurrentLinkedQueue<>();

            return o.map(v -> {
                Timed<T> t = new Timed<>(v, 
                    index.getAndIncrement(), TimeUnit.NANOSECONDS);
                queue.offer(t);
                return t;
            })
            .debounce(time, unit, scheduler)
            .map(v -> {
                while (!queue.isEmpty()) {
                    Timed<T> t = queue.peek();
                    if (t.time() < v.time()) {
                        queue.poll();
                        dropped.accept(t.value());
                    } else
                    if (t == v) {
                        queue.poll();
                        break;
                    }
                }
                return v.value();
            })
            .doOnComplete(() -> {
                while (!queue.isEmpty()) {
                    dropped.accept(queue.poll().value());
                }
            });
        });
    }
}

печать

Passed: 1 @ T=10
Dropped: 2 @ T=43
Dropped: 3 @ T=43
Dropped: 4 @ T=43
Passed: 5 @ T=43
Passed: 6 @ T=73
Done  @ T=93
0 голосов
/ 07 мая 2018

Вы можете использовать оператор буфера вместе с оператором debounce. Вот очень простой пример:

// This is our event stream. In this example we only track mouseup events on the document
const move$ = Observable.fromEvent(document, 'mouseup');

// We want to create a debounced version of the initial stream
const debounce$ = move$.debounceTime(1000);

// Now create the buffered stream from the initial move$ stream. 
// The debounce$ stream can be used to emit the values that are in the buffer
const buffered$ = move$.buffer(debounce$);

// Subscribe to your buffered stream
buffered$.subscribe(res => console.log('Buffered Result: ', res));
0 голосов
/ 07 мая 2018

Если я правильно понимаю, чего вы хотите достичь, вам, вероятно, нужно построить Observable, который испускает какой-то объект, который содержит как исходное значение (т.е. синий, красный, зеленый в вашем случае), так и флаг, который указывает был ли зеленый цвет в опровергнутых значениях.

Если это правда, вы можете попытаться написать код по этим направлениям

const s = new Subject<string>();

setTimeout(() => s.next('B'), 100);
setTimeout(() => s.next('G'), 1100);
setTimeout(() => s.next('B'), 1200);
setTimeout(() => s.next('G'), 1300);
setTimeout(() => s.next('R'), 1400);
setTimeout(() => s.next('B'), 2400);

let hasGreen = false;

s
.do(data => hasGreen =  hasGreen || data === 'G')
.debounceTime(500)
.map(data => ({data, hasGreen})) // this map has to come before the following do
.do(() => hasGreen =  false)
.subscribe(data => console.log(data))

Будьте осторожны с последовательностью. В частности, вы должны поместить оператор map, который создает объект, который вы хотите испустить, перед do, который сбрасывает вашу переменную.

...