Как принимать только последние наблюдаемые значения, используя ObserveLatestOn, при подписке с помощью асинхронного метода? - PullRequest
0 голосов
/ 04 июня 2019

У меня проблема с обратным давлением в моем приложении, которое получает обновления для кэшированных элементов через наблюдаемые RX и обрабатывает их с помощью продолжительного асинхронного метода. Я бы хотел пакетировать обновления и вызывать Task.WhenAll () для одновременной работы с пакетом до его завершения. Затем я хотел бы отбросить все обновления, полученные за это время, и продолжить обработку следующего пакета, содержащего только последние полученные обновления.

Я изо всех сил пытаюсь получить упрощенную (т.е. без буферизации) версию, работающую с использованием метода ObserveLatestOn из этого поста и асинхронной подписки (т.е. Task.Delay () для имитации длительного вызова).

Пожалуйста, кто-нибудь может объяснить, что я делаю неправильно или как это сделать? (в идеале с RX, но другие решения, основанные на задачах / очередях, будут в порядке)

    public async Task Test()
    {
        ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();

        var testSub = Observable.Interval(TimeSpan.FromMilliseconds(250))
        .Do(x => messageQueue.Enqueue("source: " + x))
        .ObserveLatestOn()
        .Subscribe(i => 
        { 
            messageQueue.Enqueue($"[{i}]");
            Thread.Sleep(TimeSpan.FromSeconds(1));
        });

        // wait for test to finish
        await Task.Delay(TimeSpan.FromSeconds(4));
        testSub.Dispose();
        Console.WriteLine(string.Join(Environment.NewLine, messageQueue));          
    }

Например, вышеприведенный не асинхронный тест с Thread.Sleep () дает ожидаемый результат с пропущенными значениями подписки и видит только 0,4,8 ...

источник: 0 [0] источник: 1 источник: 2 источник: 3 источник: 4 [4] источник: 5 источник: 6 ...

Но когда я пытаюсь использовать Task.Delay с SubscribeAsync, взятым из здесь :

        var testSub = Observable.Interval(TimeSpan.FromMilliseconds(250))
            .Do(x => messageQueue.Enqueue("source: " + x))
            .ObserveLatestOn()
            .SubscribeAsync(async i =>
            {
                messageQueue.Enqueue($"[{i}]");
                await Task.Delay(TimeSpan.FromSeconds(1));
            });

        public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNextAsync)
        {
            return source.Select(number => Observable.FromAsync(async ct => await onNextAsync(number))).Concat().Subscribe();
        }

Вывод:

источник: 0 [0] источник: 1 источник: 2 источник: 3 источник: 4 [1] источник: 5 источник: 6 источник: 7 источник: 8 [2] ...

и подписка получает значения 0,1,2 ... значения не удаляются!

...