Задержки тестирования в Rx - PullRequest
1 голос
/ 17 июня 2020

Я пытаюсь понять, как можно go протестировать следующую функцию, которая добавляет мониторинг внутренней очереди Observable.ObserveOn.

public IObservable<T> MonitorBuffer<T>(IObservable<T> source, Action<int> monitor, TimeSpan interval, IScheduler scheduler)
{
    return Observable.Create<T>(ob =>
    {
        int count = 0;

        return new CompositeDisposable(source
            .Do(_ => Interlocked.Increment(ref count))
            .ObserveOn(scheduler)
            .Do(_ => Interlocked.Decrement(ref count))
            .Subscribe(ob),
            Observable.Interval(interval, scheduler).Select(_ => count).DistinctUntilChanged().Subscribe(monitor)
        );
    });
}

Я предполагаю что-то вроде этого:

var ts = new TestScheduler();
var input = Enumerable.Range(1, 8).Select(i => OnNext(i * 10, i)).ToArray();
var hot = ts.CreateHotObservable(input);
var observer = ts.CreateObserver<int>();
var log = new Subject<int>();
var monitor = ts.CreateObserver<int>();
var ticks = TimeSpan.FromTicks(5);
var buffered = MonitorBuffer(hot, log.OnNext, ticks, ts);
log.Subscribe(monitor);

buffered.Do(x => { /*if(x == 3) Introduce delay here */}).Subscribe(observer);

ts.AdvanceTo(100);
observer.Messages.AssertEqual(...);
monitor.Messages.AssertEqual(...);

Вопрос в том, что я могу добавить в Do, чтобы получить желаемый эффект временной задержки нисходящего потока.

Я ищу примерно такие результаты:

//time:    0--------10--------20--------30--------40--------50--------60--------70--------
//source:  ---------1---------2---------3---------4---------5---------6---------7---------
//output:  ---------1---------2-----------------------------345-------6---------7---------
//log:     ----0-------------------------1---------2---------2----0-----------------------

(NB: я задавал аналогичный вопрос какое-то время go, но это было не очень ясно, и сейчас немного поздно для полной перезаписи).

1 Ответ

1 голос
/ 18 июня 2020

Думаю, я это прибил ...

Секрет в том, чтобы иметь два планировщика, которые можно продвигать независимо.

Основываясь на тестовом коде в вопросе:

var inputscheduler = new TestScheduler();
(...)

//different scheduler for buffer/observeOn
var bufferScheduler = new TestScheduler();
var buffered = MonitorBuffer(hot, log.OnNext, ticks, bufferScheduler);
log.Subscribe(monitor);
buffered.Subscribe(observer);

//instead of inserting something downstream, use scheduler advances
for (int i = 3; i < 80; i++)
{
    inputscheduler.AdvanceTo(i);
    if (i < 25|| i > 45) bufferscheduler.AdvanceTo(i);
}

observer.Messages.AssertEqual(...);
monitor.Messages.AssertEqual(...);
...