Есть ли что-то вроде ThrottleWhile или SubscribeAsyn c в реактивном состоянии? - PullRequest
1 голос
/ 07 августа 2020

У меня есть асинхронная подписка на logi c, и когда она запустится, я не хочу, чтобы она запускалась снова, пока эта logi c не будет завершена. Но если во время работы logi c наблюдается какое-либо событие, оно будет дросселироваться до тех пор, пока подписка logi c не будет завершена, а затем немедленно сработает снова

Итак, я wi sh, что я мог напишите

observable.SubscribeAsync(async(data) => await DoSomething(data))`

А точнее. Я хочу ограничить входные данные, не пропуская и не ожидая, а ограничивая их. Поэтому я думаю, что было бы лучше, если бы у нас был ThrottleWhile (и, возможно, также DebounceWhile или ThrottleUntil)

bool running = false;
observable.ThrottleWhile((_) => running).Subscribe((data) => {
    try
    {
        running = true;
        await DoSomething(data);
    }
    finally
    {
        running = false;
    }
})`

Возможно ли это уже? Как я могу создать этот поток по реактивной схеме?

Ответы [ 2 ]

2 голосов
/ 11 августа 2020

Я не следил за библиотекой, поэтому, возможно, был добавлен встроенный оператор, но я не знаю ни одного в той версии, которую я использую. Вот основная c версия того, что звучит так, как будто вы просили:

static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext,
                                     Action<Exception> onError, Action onComplete)
{
    //argument error checking omitted for brevity
    T current = default(T);
    bool processing = false;
    bool haveCurrent = false;

    return source
           .Where((v) =>
                  {
                      if (processing)
                      {
                          current = v;
                          haveCurrent = true;
                      }
                      return !processing;
                  })
           .Subscribe((v) =>
                      {
                          Action<Task> runNext = null;
                          runNext = (task) =>
                              {
                                  if (haveCurrent)
                                  {
                                      haveCurrent = false;
                                      onNext(current).ContinueWith(runNext);
                                  }
                                  else
                                  {
                                      processing = false;
                                  }
                              };
                          processing = true;
                          onNext(v).ContinueWith(runNext);
                      },
                      onError,
                      onComplete);
}

Некоторые примечания, которые следует учитывать при этой реализации:

Последний вызов onNext может произойти после вызова в onComplete, если последний элемент и завершение происходят во время обработки предыдущего элемента. Точно так же onComplete может вызываться во время выполнения задачи onNext. Это может не быть проблемой, если ваш исходный код никогда не завершается. Если это проблема, вам нужно будет решить, хотите ли вы отложить вызов onComplete до тех пор, пока не будет обработан последний элемент, или отменить обработку последнего элемента и внести соответствующие изменения.

Я не пытался тест на любые условия гонки, которые могут или не могут возникнуть, когда элементы поступают правильно после завершения прогона обработки.

1 голос
/ 12 августа 2020

Как насчет чего-то вроде этого:

public static IObservable<TResult> SelectAsync<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> projectAsync)
{
    return Observable.Create<TResult>(
        observer =>
        {
            var throttle = new BehaviorSubject<TResult>(default);

            var observable = source
                .Zip(throttle, (value, _) => value)
                .SelectMany(value => Observable.Defer(() => Observable.StartAsync(() => projectAsync(value))))
                .Publish();

            return new CompositeDisposable(
                observable.Subscribe(throttle),
                observable.Subscribe(observer),
                observable.Connect(),
                throttle
            );
        }
    );
}

В этом методе расширения Zip в сочетании с BehaviorSubject образуют дроссель, в котором элементы ставятся в очередь до тех пор, пока projectAsync не будет завершено.

Затем его можно использовать следующим образом:

public static async Task<Unit> DoSomethingAsync(int value)
{
    Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Started processing value '{value}'");

    await Task.Delay(TimeSpan.FromSeconds(1));

    Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Completed processing value '{value}'");

    return Unit.Default;
}

public static async Task RunAsync()
{
    IObservable<int> source = Observable.Generate(0, value => value < 25, value => value + 1, value => value, value => TimeSpan.FromSeconds(0.1));

    await source
            .Do(value => Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Received value '{value}'"))
            .SelectAsync(value => DoSomethingAsync(value))
            .ToTask();
}

При этом создается наблюдаемый источник, который генерирует 25 элементов с интервалами 100 мс. В методе DoSomethingAsync используется Task.Delay для имитации задержки обработки в 1 секунду. Выполнение этого кода должно привести к следующему результату:

[02:07:56] Received value '0'
[02:07:56] Started processing value '0'
[02:07:56] Received value '1'
[02:07:56] Received value '2'
[02:07:57] Received value '3'
[02:07:57] Received value '4'
[02:07:57] Received value '5'
[02:07:57] Received value '6'
[02:07:57] Received value '7'
[02:07:57] Received value '8'
[02:07:57] Received value '9'
[02:07:57] Completed processing value '0'
[02:07:57] Started processing value '1'
[02:07:57] Received value '10'
[02:07:57] Received value '11'
[02:07:58] Received value '12'
[02:07:58] Received value '13'
[02:07:58] Received value '14'
[02:07:58] Received value '15'
[02:07:58] Received value '16'
[02:07:58] Received value '17'
[02:07:58] Received value '18'
[02:07:58] Completed processing value '1'
[02:07:58] Started processing value '2'
[02:07:58] Received value '19'
[02:07:58] Received value '20'
[02:07:59] Received value '21'
[02:07:59] Received value '22'
[02:07:59] Received value '23'
[02:07:59] Received value '24'
[02:07:59] Completed processing value '2'
[02:07:59] Started processing value '3'
[02:08:00] Completed processing value '3'
[02:08:00] Started processing value '4'
[02:08:01] Completed processing value '4'
...
[02:08:20] Started processing value '23'
[02:08:21] Completed processing value '23'
[02:08:21] Started processing value '24'
[02:08:22] Completed processing value '24'

Вы должны знать, что этот код не предоставляет никаких средств противодействия источнику, поэтому, если источник постоянно генерирует элементы быстрее, чем projectAsync давление памяти будет нарастать (через организацию очереди в операторе Zip), пока вы не получите исключение нехватки памяти.

Кроме того, хотя я не знаю варианта использования этого требования, вы можете захотеть подумайте, что здесь лучше: "System.Interactive.Asyn c" или "System.Threading.Tasks.DataFlow".

...