У меня проблема с обратным давлением в моем приложении, которое получает обновления для кэшированных элементов через наблюдаемые RX и обрабатывает их с помощью продолжительного асинхронного метода. Я бы хотел пакетировать обновления и вызывать Task.WhenAll () для одновременной работы с пакетом до его завершения. Затем я хотел бы отбросить все обновления, полученные за это время, и продолжить обработку следующего пакета, содержащего только последние полученные обновления.
Я изо всех сил пытаюсь получить упрощенную (т.е. без буферизации) версию, работающую с использованием метода ObserveLatestOn из этого поста и асинхронной подписки (т.е. Task.Delay () для имитации длительного вызова).
Пожалуйста, кто-нибудь может объяснить, что я делаю неправильно или как это сделать? (в идеале с RX, но другие решения, основанные на задачах / очередях, будут в порядке)
public async Task Test()
{
ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();
var testSub = Observable.Interval(TimeSpan.FromMilliseconds(250))
.Do(x => messageQueue.Enqueue("source: " + x))
.ObserveLatestOn()
.Subscribe(i =>
{
messageQueue.Enqueue($"[{i}]");
Thread.Sleep(TimeSpan.FromSeconds(1));
});
// wait for test to finish
await Task.Delay(TimeSpan.FromSeconds(4));
testSub.Dispose();
Console.WriteLine(string.Join(Environment.NewLine, messageQueue));
}
Например, вышеприведенный не асинхронный тест с Thread.Sleep () дает ожидаемый результат с пропущенными значениями подписки и видит только 0,4,8 ...
источник: 0 [0] источник: 1 источник: 2 источник: 3 источник: 4 [4] источник: 5 источник: 6 ...
Но когда я пытаюсь использовать Task.Delay с SubscribeAsync, взятым из здесь :
var testSub = Observable.Interval(TimeSpan.FromMilliseconds(250))
.Do(x => messageQueue.Enqueue("source: " + x))
.ObserveLatestOn()
.SubscribeAsync(async i =>
{
messageQueue.Enqueue($"[{i}]");
await Task.Delay(TimeSpan.FromSeconds(1));
});
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNextAsync)
{
return source.Select(number => Observable.FromAsync(async ct => await onNextAsync(number))).Concat().Subscribe();
}
Вывод:
источник: 0 [0] источник: 1 источник: 2 источник: 3 источник: 4 [1] источник: 5 источник: 6
источник: 7 источник: 8 [2] ...
и подписка получает значения 0,1,2 ... значения не удаляются!