Я хотел бы настроить подписку Rx, которая может сразу реагировать на событие, а затем игнорировать последующие события, которые происходят в течение указанного периода «перезарядки».
Стандартные методы Throttle / Buffer отвечают только по истечении времени ожидания, что не совсем то, что мне нужно.
Вот код, который устанавливает сценарий и используетДроссель (что не является решением, которое я хочу):
class Program
{
static Stopwatch sw = new Stopwatch();
static void Main(string[] args)
{
var subject = new Subject<int>();
var timeout = TimeSpan.FromMilliseconds(500);
subject
.Throttle(timeout)
.Subscribe(DoStuff);
var factory = new TaskFactory();
sw.Start();
factory.StartNew(() =>
{
Console.WriteLine("Batch 1 (no delay)");
subject.OnNext(1);
});
factory.StartNewDelayed(1000, () =>
{
Console.WriteLine("Batch 2 (1s delay)");
subject.OnNext(2);
});
factory.StartNewDelayed(1300, () =>
{
Console.WriteLine("Batch 3 (1.3s delay)");
subject.OnNext(3);
});
factory.StartNewDelayed(1600, () =>
{
Console.WriteLine("Batch 4 (1.6s delay)");
subject.OnNext(4);
});
Console.ReadKey();
sw.Stop();
}
private static void DoStuff(int i)
{
Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds);
}
}
Результат выполнения этого прямо сейчас:
Пакет 1 (без задержки)
Обработка 1 при 508 мс
Пакет 2 (задержка 1 с)
Пакет 3 (задержка 1,3 с)
Пакет 4 (задержка 1,6 с)
Обработка 4 при 2114мс
Обратите внимание, что пакет 2 не обрабатывается (что нормально!), Потому что мы ожидаем, что между запросами пройдет 500 мс из-заприрода дросселя.Партия 3 также не обрабатывается (что не так хорошо, потому что это произошло более чем в 500 мс от партии 2) из-за ее близости к Пакету 4.
То, что я ищу, выглядит примерно так:
Пакет 1 (без задержки)
Обработка 1 при ~ 0 мс
Пакет 2 (с задержкой 1 с)
Обработка 2 при ~ 1000 с
Пакет 3 (с задержкой 1,3 с)
Пакет 4 (с задержкой 1,6 с)
Управление 4 при ~ 1600 с
Обратите внимание, что партия 3 не будет обрабатываться в этом сценарии (что нормально!), Поскольку она происходит в пределах 500 мс от партии 2.
РЕДАКТИРОВАТЬ :
Вот реализация для метода расширения «StartNewDelayed», который я использую:
/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <returns>A Task that will be completed after the specified duration.</returns>
public static Task StartNewDelayed(
this TaskFactory factory, int millisecondsDelay)
{
return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None);
}
/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param>
/// <returns>A Task that will be completed after the specified duration and that's cancelable with the specified token.</returns>
public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken)
{
// Validate arguments
if (factory == null) throw new ArgumentNullException("factory");
if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay");
// Create the timed task
var tcs = new TaskCompletionSource<object>(factory.CreationOptions);
var ctr = default(CancellationTokenRegistration);
// Create the timer but don't start it yet. If we start it now,
// it might fire before ctr has been set to the right registration.
var timer = new Timer(self =>
{
// Clean up both the cancellation token and the timer, and try to transition to completed
ctr.Dispose();
((Timer)self).Dispose();
tcs.TrySetResult(null);
});
// Register with the cancellation token.
if (cancellationToken.CanBeCanceled)
{
// When cancellation occurs, cancel the timer and try to transition to cancelled.
// There could be a race, but it's benign.
ctr = cancellationToken.Register(() =>
{
timer.Dispose();
tcs.TrySetCanceled();
});
}
if (millisecondsDelay > 0)
{
// Start the timer and hand back the task...
timer.Change(millisecondsDelay, Timeout.Infinite);
}
else
{
// Just complete the task, and keep execution on the current thread.
ctr.Dispose();
tcs.TrySetResult(null);
timer.Dispose();
}
return tcs.Task;
}