Rx: Как я могу ответить сразу, и дросселировать последующие запросы - PullRequest
27 голосов
/ 03 ноября 2011

Я хотел бы настроить подписку Rx, которая может сразу реагировать на событие, а затем игнорировать последующие события, которые происходят в течение указанного периода «перезарядки».

Стандартные методы Throttle / Buffer отвечают только по истечении времени ожидания, что не совсем то, что мне нужно.

Вот код, который устанавливает сценарий и используетДроссель (что не является решением, которое я хочу):

class Program
{
    static Stopwatch sw = new Stopwatch();

    static void Main(string[] args)
    {
        var subject = new Subject<int>();
        var timeout = TimeSpan.FromMilliseconds(500);

        subject
            .Throttle(timeout)
            .Subscribe(DoStuff);

        var factory = new TaskFactory();

        sw.Start();

        factory.StartNew(() =>
        {
            Console.WriteLine("Batch 1 (no delay)");
            subject.OnNext(1);
        });

        factory.StartNewDelayed(1000, () =>
        {
            Console.WriteLine("Batch 2 (1s delay)");
            subject.OnNext(2);
        });

        factory.StartNewDelayed(1300, () =>
        {
            Console.WriteLine("Batch 3 (1.3s delay)");
            subject.OnNext(3);
        });

        factory.StartNewDelayed(1600, () =>
        {
            Console.WriteLine("Batch 4 (1.6s delay)");
            subject.OnNext(4);
        });

        Console.ReadKey();
        sw.Stop();
    }

    private static void DoStuff(int i)
    {
        Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds);
    }
}

Результат выполнения этого прямо сейчас:

Пакет 1 (без задержки)

Обработка 1 при 508 мс

Пакет 2 (задержка 1 с)

Пакет 3 (задержка 1,3 с)

Пакет 4 (задержка 1,6 с)

Обработка 4 при 2114мс

Обратите внимание, что пакет 2 не обрабатывается (что нормально!), Потому что мы ожидаем, что между запросами пройдет 500 мс из-заприрода дросселя.Партия 3 также не обрабатывается (что не так хорошо, потому что это произошло более чем в 500 мс от партии 2) из-за ее близости к Пакету 4.

То, что я ищу, выглядит примерно так:

Пакет 1 (без задержки)

Обработка 1 при ~ 0 мс

Пакет 2 (с задержкой 1 с)

Обработка 2 при ~ 1000 с

Пакет 3 (с задержкой 1,3 с)

Пакет 4 (с задержкой 1,6 с)

Управление 4 при ~ 1600 с

Обратите внимание, что партия 3 не будет обрабатываться в этом сценарии (что нормально!), Поскольку она происходит в пределах 500 мс от партии 2.

РЕДАКТИРОВАТЬ :

Вот реализация для метода расширения «StartNewDelayed», который я использую:

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <returns>A Task that will be completed after the specified duration.</returns>
public static Task StartNewDelayed(
    this TaskFactory factory, int millisecondsDelay)
{
    return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None);
}

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param>
/// <returns>A Task that will be completed after the specified duration and that's cancelable with the specified token.</returns>
public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken)
{
    // Validate arguments
    if (factory == null) throw new ArgumentNullException("factory");
    if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay");

    // Create the timed task
    var tcs = new TaskCompletionSource<object>(factory.CreationOptions);
    var ctr = default(CancellationTokenRegistration);

    // Create the timer but don't start it yet.  If we start it now,
    // it might fire before ctr has been set to the right registration.
    var timer = new Timer(self =>
    {
        // Clean up both the cancellation token and the timer, and try to transition to completed
        ctr.Dispose();
        ((Timer)self).Dispose();
        tcs.TrySetResult(null);
    });

    // Register with the cancellation token.
    if (cancellationToken.CanBeCanceled)
    {
        // When cancellation occurs, cancel the timer and try to transition to cancelled.
        // There could be a race, but it's benign.
        ctr = cancellationToken.Register(() =>
        {
            timer.Dispose();
            tcs.TrySetCanceled();
        });
    }

    if (millisecondsDelay > 0)
    {
        // Start the timer and hand back the task...
        timer.Change(millisecondsDelay, Timeout.Infinite);
    }
    else
    {
        // Just complete the task, and keep execution on the current thread.
        ctr.Dispose();
        tcs.TrySetResult(null);
        timer.Dispose();
    }

    return tcs.Task;
}

Ответы [ 9 ]

12 голосов
/ 27 ноября 2014

Вот мой подход. Это похоже на другие, которые были раньше, но оно не страдает из-за чрезмерного усердия в производстве окон.

Желаемая функция работает так же, как и Observable.Throttle, но выдает квалифицирующие события, как только они приходят, а не с задержкой на время газа или периода выборки. В течение определенного периода времени после квалификационного события последующие события подавляются.

В качестве тестируемого метода расширения:

public static class ObservableExtensions
{
    public static IObservable<T> SampleFirst<T>(
        this IObservable<T> source,
        TimeSpan sampleDuration,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        return source.Publish(ps => 
            ps.Window(() => ps.Delay(sampleDuration,scheduler))
              .SelectMany(x => x.Take(1)));
    }
}

Идея состоит в том, чтобы использовать перегрузку Window, которая создает непересекающиеся окна, используя windowClosingSelector, который использует источник, смещенный назад на sampleDuration. Следовательно, каждое окно будет: (а) закрываться первым элементом в нем и (б) оставаться открытым до тех пор, пока не будет разрешен новый элемент. Затем мы просто выбираем первый элемент из каждого окна.

Rx 1.x Версия

Используемый выше метод расширения Publish недоступен в Rx 1.x. Вот альтернатива:

public static class ObservableExtensions
{
    public static IObservable<T> SampleFirst<T>(
        this IObservable<T> source,
        TimeSpan sampleDuration,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        var sourcePub = source.Publish().RefCount();
        return sourcePub.Window(() => sourcePub.Delay(sampleDuration,scheduler))
                        .SelectMany(x => x.Take(1));
    }
}
8 голосов
/ 03 ноября 2011

Решение, которое я нашел после большого количества проб и ошибок, состояло в том, чтобы заменить ограниченную подписку следующим:

subject
    .Window(() => { return Observable.Interval(timeout); })
    .SelectMany(x => x.Take(1))
    .Subscribe(i => DoStuff(i));

Отредактировано для включения очистки Павла.

3 голосов
/ 03 ноября 2011

Отличное решение, Андрей! Мы можем сделать еще один шаг вперед и очистить внутреннюю. Подписаться:

subject
    .Window(() => { return Observable.Interval(timeout); })
    .SelectMany(x => x.Take(1))
    .Subscribe(DoStuff);
2 голосов
/ 04 ноября 2011

Первоначальный ответ, который я опубликовал, имеет недостаток, а именно: метод Window, когда используется с Observable.Interval для обозначения конца окна, устанавливает бесконечный ряд окон размером 500 мс. Что мне действительно нужно, так это окно, которое открывается, когда в результат закачивается первый результат, и заканчивается через 500 мс.

Мой пример данных скрыл эту проблему, потому что данные аккуратно разбились на окна, которые уже собирались создать. (то есть 0-500 мс, 501-1000 мс, 1001-1500 мс и т. д.)

Рассмотрим вместо этого время:

factory.StartNewDelayed(300,() =>
{
    Console.WriteLine("Batch 1 (300ms delay)");
    subject.OnNext(1);
});

factory.StartNewDelayed(700, () =>
{
    Console.WriteLine("Batch 2 (700ms delay)");
    subject.OnNext(2);
});

factory.StartNewDelayed(1300, () =>
{
    Console.WriteLine("Batch 3 (1.3s delay)");
    subject.OnNext(3);
});

factory.StartNewDelayed(1600, () =>
{
    Console.WriteLine("Batch 4 (1.6s delay)");
    subject.OnNext(4);
});

Что я получаю:

Пакет 1 (задержка 300 мс)

Обработка 1 при 356 мс

Пакет 2 (задержка 700 мс)

Обработка 2 при 750 мс

Пакет 3 (задержка 1,3 с)

Обработка 3 при 1346мс

Пакет 4 (с задержкой 1,6 с)

Обработка 4 при 1644 мс

Это потому, что окна начинаются с 0 мс, 500 мс, 1000 мс и 1500 мс, и поэтому каждое Subject.OnNext хорошо вписывается в свое собственное окно.

Что я хочу это:

Пакет 1 (задержка 300 мс)

Обработка 1 при ~ 300 мс

Пакет 2 (задержка 700 мс)

Пакет 3 (задержка 1,3 с)

Обработка 3 при ~ 1300 мс

Пакет 4 (с задержкой 1,6 с)

После долгих попыток и целого часа совместной работы мы пришли к лучшему решению, используя чистый Rx и одну локальную переменную:

bool isCoolingDown = false;

subject
    .Where(_ => !isCoolingDown)
    .Subscribe(
    i =>
    {
        DoStuff(i);

        isCoolingDown = true;

        Observable
            .Interval(cooldownInterval)
            .Take(1)
            .Subscribe(_ => isCoolingDown = false);
    });

Мы предполагаем, что вызовы метода подписки синхронизированы. Если нет, то можно ввести простую блокировку.

0 голосов
/ 09 июля 2019

Это старая публикация, но ни один ответ не может действительно удовлетворить мои потребности, поэтому я даю свое собственное решение:

public static IObservable<T> ThrottleOrImmediate<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
    return Observable.Create<T>((obs, token) =>
    {
        // Next item cannot be send before that time
        DateTime nextItemTime = default;

        return Task.FromResult(source.Subscribe(async item =>
        {
            var currentTime = DateTime.Now;
            // If we already reach the next item time
            if (currentTime - nextItemTime >= TimeSpan.Zero)
            {
                // Following item will be send only after the set delay
                nextItemTime = currentTime + delay;
                // send current item with scheduler
                scheduler.Schedule(() => obs.OnNext(item));
            }
            // There is still time before we can send an item
            else
            {
                // we schedule the time for the following item
                nextItemTime = currentTime + delay;
                try
                {
                    await Task.Delay(delay, token);
                }
                catch (TaskCanceledException)
                {
                    return;
                }

                // If next item schedule was change by another item then we stop here
                if (nextItemTime > currentTime + delay)
                    return;
                else
                {
                    // Set next possible time for an item and send item with scheduler
                    nextItemTime = currentTime + delay;
                    scheduler.Schedule(() => obs.OnNext(item));
                }
            }
        }));

    });
}

Первый элемент немедленно отправляется, затем следующие элементы удушаются.Затем, если следующий элемент будет отправлен позже установленного времени, он также будет немедленно отправлен.

0 голосов
/ 20 апреля 2015

Используйте .Scan()!Это то, что я использую для Throttling, когда мне нужно сразу первый удар (после определенного периода), но задерживаю (и группирую / игнорирую) любые последующие попадания.В основном работает как Throttle, но срабатывает немедленно, если предыдущий onNext был> = interval назад, в противном случае запланируйте его точно на interval от предыдущего попадания.И, конечно же, если в течение периода «охлаждения» происходит многократные попадания, дополнительные игнорируются, как это делает Throttle.Разница с вашим вариантом использования заключается в том, что если вы получите событие в 0 мс и 100 мс, они оба будут обрабатываться (в 0 мс и 500 мс), что может быть тем, что вы на самом деле хотите (в противном случае аккумулятор легко адаптировать, чтобы игнорироватьЛЮБОЙ удар ближе чем interval к предыдущему).

public static IObservable<T> QuickThrottle<T>(this IObservable<T> src, TimeSpan interval, IScheduler scheduler)
{
  return src
    .Scan(new ValueAndDueTime<T>(), (prev, id) => AccumulateForQuickThrottle(prev, id, interval, scheduler))
    .Where(vd => !vd.Ignore)
    .SelectMany(sc => Observable.Timer(sc.DueTime, scheduler).Select(_ => sc.Value));
}

private static ValueAndDueTime<T> AccumulateForQuickThrottle<T>(ValueAndDueTime<T> prev, T value, TimeSpan interval, IScheduler s)
{
  var now = s.Now;

  // Ignore this completely if there is already a future item scheduled
  //  but do keep the dueTime for accumulation!
  if (prev.DueTime > now) return new ValueAndDueTime<T> { DueTime = prev.DueTime, Ignore = true };

  // Schedule this item at at least interval from the previous
  var min = prev.DueTime + interval;
  var nextTime = (now < min) ? min : now;
  return new ValueAndDueTime<T> { DueTime = nextTime, Value = value };
}

private class ValueAndDueTime<T>
{
  public DateTimeOffset DueTime;
  public T Value;
  public bool Ignore;
}
0 голосов
/ 25 июля 2013

Я наткнулся на этот вопрос, пытаясь повторно реализовать свое собственное решение той же или аналогичной проблемы, используя .Window Посмотрите, кажется, он такой же, как этот, и решен довольно элегантно:

https://stackoverflow.com/a/3224723/58463

0 голосов
/ 10 ноября 2011

У меня есть еще один для тебя.Этот не использует Repeat () и Interval (), так что это может быть то, что вы после:

subject
    .Window(() => Observable.Timer(TimeSpan.FromMilliseconds(500)))
    .SelectMany(x => x.Take(1));
0 голосов
/ 09 ноября 2011

Что ж, наиболее очевидной вещью будет использование здесь Repeat (). Однако, насколько я знаю, Repeat () может создавать проблемы, так что уведомления исчезают между моментом, когда поток останавливается, и мы снова подписываемся. На практике это никогда не было проблемой для меня.

subject
    .Take(1)
    .Concat(Observable.Empty<long>().Delay(TimeSpan.FromMilliseconds(500)))
    .Repeat();

Не забудьте заменить на фактический тип вашего источника.

UPDATE:

Обновлен запрос для использования Concat вместо Merge

...