У меня есть поток запросов, которые могут обрабатываться асинхронно параллельно, если они не имеют одинаковый ключ.Все запросы с одним и тем же ключом должны обрабатываться последовательно.
Что я сделал, я обернул ленту запросов в IObservable<Request>
, сгруппировал по ключу и использовал трюк с Observable.Scan
для выполнения запросов в одной группе.обрабатываться параллельно:
var feed = requests
.GroupBy(request => request.RequestKey)
.SelectMany(requestGroup => requestGroup
.Scan(
Task.FromResult(Unit.Default),
async (previousTask, request) =>
{
await previousTask;
await httpClient.PostAsJsonAsync("some/url", request);
return Unit.Default;
})
.Merge());
using (feed.Subscribe())
Console.ReadLine(); // Keep on processing
Некоторый фон: имеется большое число (~ 10000) возможных значений RequestKey
, которые в 99% не пересекаются в течениепараллельная обработка.
Есть ли в этом решении некоторые недостатки?Может быть, есть более аккуратный, который, возможно, не о Rx?