Rx.NET Возьми элемент и подпишись через некоторое время - PullRequest
0 голосов
/ 12 ноября 2018

Мне нужен вид дросселирования, который работает немного по-другому. Мне нужно получить элемент из последовательности, отписаться и подписаться снова через 1 сек. Другими словами, я хочу игнорировать все элементы в течение 1 секунды после взятия первого элемента:

Input:  (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
Output: (1) --------------------------------- (4) --------- (5) ----------- ...

Как можно достичь этой простой вещи с Rx.NET?

Ответы [ 2 ]

0 голосов
/ 13 ноября 2018

@ Ответ Enigmativity не совсем соответствует спецификации. Это может работать на то, что вы хотите, хотя.

Его ответ определяет 1-секундные окна и берет первое из каждого из этих окон. Это не гарантирует вам ни секунды молчания между предметами. Рассмотрим этот случай:

t     : ---------1---------2---------3
source: ------1---2------3---4----5--|
window: ---------|---------|---------|
spec  : ------1----------3-----------|
enigma: ------1---2----------4-------|

Ответ подразумевает, что вы хотите одну секунду из ничего после пункта 1. Следующий пункт после этого - 3, с тишиной, затем до конца. Вот этот тест закодирован:

var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<int>(
    RxTest.OnNext(700.MsTicks(),  1),
    RxTest.OnNext(1100.MsTicks(), 2),
    RxTest.OnNext(1800.MsTicks(), 3),
    RxTest.OnNext(2200.MsTicks(), 4),
    RxTest.OnNext(2600.MsTicks(), 5),
    RxTest.OnCompleted<int>(3000.MsTicks())
);

var expectedResults = scheduler.CreateHotObservable<int>(
    RxTest.OnNext(700.MsTicks(),  1),
    RxTest.OnNext(1800.MsTicks(), 3),
    RxTest.OnCompleted<int>(3000.MsTicks())
);

var target = source
    .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
    .SelectMany(xs => xs.Take(1));

var observer = scheduler.CreateObserver<int>();
target.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);

Я думаю, что лучший способ обойти это - решение на основе Scan с временными метками. По сути, вы храните последнее законное сообщение в памяти с отметкой времени, а если новое сообщение на одну секунду старше, выведите. В противном случае, не:

public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
{
    return TrueThrottle<T>(source, span, Scheduler.Default);
}

public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
{
    return source
        .Timestamp(scheduler)
        .Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
            ? item
            : state
        )
        .DistinctUntilChanged()
        .Select(t => t.Value);
}

Примечание. В тестовом коде используется Nuget Microsoft.Reactive.Testing и следующий вспомогательный класс:

public static class RxTest
{
    public static long MsTicks(this int i)
    {
        return TimeSpan.FromMilliseconds(i).Ticks;
    }

    public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
    {
        return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
    }

    public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
    {
        return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
    }

    public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
    {
        return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
    }
}
0 голосов
/ 13 ноября 2018

Попробуйте:

Input
    .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
    .SelectMany(xs => xs.Take(1));

Вот тест:

var query =
    Observable
        .Interval(TimeSpan.FromSeconds(0.2))
        .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
        .SelectMany(xs => xs.Take(1));

Получено:

0 
5 
10 
14 
19 
24 
29 
34 
39 

Скачок с 10 до 14 - это просто результатиспользования нескольких потоков, а не ошибка в запросе.

...