Обработка задач параллельно, но последовательно внутри каждой группы - PullRequest
0 голосов
/ 22 февраля 2019

У меня есть поток запросов, которые могут обрабатываться асинхронно параллельно, если они не имеют одинаковый ключ.Все запросы с одним и тем же ключом должны обрабатываться последовательно.

Что я сделал, я обернул ленту запросов в IObservable<Request>, сгруппировал по ключу и использовал трюк с Observable.Scan для выполнения запросов в одной группе.обрабатываться параллельно:

var feed = requests
    .GroupBy(request => request.RequestKey)
    .SelectMany(requestGroup => requestGroup
        .Scan(
            Task.FromResult(Unit.Default),
            async (previousTask, request) =>
            {
                await previousTask;
                await httpClient.PostAsJsonAsync("some/url", request);

                return Unit.Default;
            })
        .Merge());

using (feed.Subscribe())
    Console.ReadLine();    // Keep on processing

Некоторый фон: имеется большое число (~ 10000) возможных значений RequestKey, которые в 99% не пересекаются в течениепараллельная обработка.

Есть ли в этом решении некоторые недостатки?Может быть, есть более аккуратный, который, возможно, не о Rx?

...