System.Reactive Регулирование асинхронного метода - PullRequest
0 голосов
/ 25 мая 2018

Я так долго откладывал использование реактивных расширений, и я подумал, что это было бы неплохо.Проще говоря, у меня есть метод, который можно вызывать по разным причинам на различных путях кода

private async Task GetProductAsync(string blah) {...}

Мне нужно иметь возможность Throttle этого метода.То есть я хочу остановить поток вызовов до тех пор, пока не будет сделано больше вызовов (в течение определенного периода времени).Или, если быть более точным, если 10 вызовов этого метода произойдут в течение определенного периода времени, я хочу ограничить (ограничить) его только 1 вызовом (после периода), когда был сделан последний вызов.

Я вижупример использования метода с IEnumerable, этот вид имеет смысл

static IEnumerable<int> GenerateAlternatingFastAndSlowEvents() 
{ ... }

...

var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
    Console.WriteLine("Press any key to unsubscribe");
    Console.ReadKey();
}

Console.WriteLine("Press any key to exit");
Console.ReadKey();

Однако (и это всегда было моей главной проблемой с Rx, навсегда), как мне создать Observable изпростой async метод.

Обновление

Мне удалось найти альтернативный подход с использованием ReactiveProperty

Barcode = new ReactiveProperty<string>();
Barcode.Select(text => Observable.FromAsync(async () => await GetProductAsync(text)))
       .Throttle(TimeSpan.FromMilliseconds(1000))
       .Switch()
       .ToReactiveProperty(); 

Суть в том, что я ловлю егов текстовом свойстве Barcode, однако у него есть свои недостатки, так как ReactiveProperty заботится об уведомлении, и я не могу молча обновить поле поддержки, поскольку оно уже управляется.

Так что еще раз, если все равно не могупомогите мне преобразовать вызов метода async в Observable каким-то образом, так что я могу использовать метод Throttle, который будет более чем смешным

Ответы [ 2 ]

0 голосов
/ 25 мая 2018

Не имеет отношения к вашему вопросу, но, вероятно, полезно: оператор Rx Throttle действительно является оператором debounce.Самая близкая вещь к оператору регулирования - Sample.Вот разница (при условии, что вы хотите уменьшить или отменить до одного элемента / 3 секунды):

items   : --1-23----4-56-7----8----9-
throttle: --1--3-----4--6--7--8-----9
debounce: --1-------4--6------8----9-

Sample / throttle объединит элементы, которые прибывают в чувствительное время, и испустит последний на следующемтик выборки.Дебоуд отбрасывает предметы, которые прибывают в чувствительное время, а затем перезапускает часы: единственный способ испустить предмет - если ему предшествовал временной интервал молчания.

Оператор Throttle RX.Net делает то, что изображено debounce выше.Sample делает то, что изображено throttle выше.

Если вы хотите что-то другое, опишите, как вы хотите задушить.

0 голосов
/ 25 мая 2018

Существует два основных способа преобразования Задачи в Наблюдаемую, с важным отличием между ними.

Observable.FromAsync(()=>GetProductAsync("test"));

и

GetProductAsync("test").ToObservable();

Первый не запустит Задачу, покаВы подписываетесь на это.Второй создаст (и запустит) задачу, и результат будет немедленно или через некоторое время отображаться в наблюдаемой, в зависимости от того, насколько быстро задача.

Хотя, глядя на ваш вопрос в целом, кажется, что выхотите остановить поток вызовов. Вы не хотите ограничивать поток результатов, что приведет к ненужным вычислениям и потере.

Если это ваша цель, ваш GetProductAsync может бытьрассматривается как наблюдатель событий вызова, и GetProductAsync должен регулировать эти вызовы.Одним из способов достижения этого было бы объявить

public event Action<string> GetProduct;

и использовать

  var callStream= Observable.FromEvent<string>( 
             handler =>  GetProduct+= handler , 
             handler => GetProduct-= handler);

Тогда возникает проблема, как вернуть результат и что должно произойти, когда вызов вашего вызывающего абонентаограничены и отброшены.

Один из подходов может заключаться в объявлении типа «GetProductCall», который будет иметь входную строку и выходной результат в качестве свойств.

После этого вы можете получить такую ​​настройку, как:

var callStream= Observable.FromEvent<GetProductCall>( 
             handler =>  GetProduct+= handler , 
             handler => GetProduct-= handler)
            .Throttle(...)
            .Select(r=>async r.Result= await GetProductCall(r.Input).ToObservable().FirstAsync());

(код не проверен, только иллюстративный)

Другой подход может включать перегрузку Merge (N), которая ограничиваетмаксимальное количество одновременных наблюдаемых.

...