Можно ли решить эту проблему параллелизма с помощью async / await? - PullRequest
1 голос
/ 08 февраля 2020

У меня есть такая функция:

static void AddResultsToDb(IEnumerable<int> numbers)
{
    foreach (int number in numbers)
    {
        int result = ComputeResult(number); // This takes a long time, but is thread safe.
        AddResultToDb(number, result); // This is quick but not thread safe.
    }
}

Я мог бы решить эту проблему, используя, например, Parallel.ForEach для вычисления результатов, а затем использовать обычный foreach для добавления результатов. в базу данных.

Однако, в образовательных целях, Я бы хотел решение, которое вращается вокруг await / asyn c. Но независимо от того, как много я читаю об этом, я не могу сосредоточиться на этом. Если await / asyn c не применимо в этом контексте, я хотел бы понять, почему.

Ответы [ 4 ]

3 голосов
/ 08 февраля 2020

Как и предполагали другие, это не тот случай использования async / await, как для асинхронности. То, что вы делаете, это параллелизм. У Microsoft есть специально для этого фреймворк, и она прекрасно решает эту проблему.

Так что в учебных целях вы должны использовать Microsoft Reactive Framework (он же Rx) - NuGet System.Reactive и добавить using System.Reactive.Linq; - тогда вы можете сделать this:

static void AddResultsToDb(IEnumerable<int> numbers)
{
    numbers 
        .ToObservable()
        .SelectMany(n => Observable.Start(() => new { n, r = ComputeResult(n) }))
        .Do(x => AddResultToDb(x.n, x.r))
        .Wait();
}

Комбинация SelectMany / Observable.Start позволяет одновременно выполнять столько вызовов ComputeResult, сколько возможно. Хорошая особенность Rx состоит в том, что он затем сериализует результаты, так что только один вызов за раз переходит к AddResultToDb.


Для управления степенями параллелизма вы можете изменить SelectMany на Select / Merge вот так:

static void AddResultsToDb(IEnumerable<int> numbers)
{
    numbers 
        .ToObservable()
        .Select(n => Observable.Start(() => new { n, r = ComputeResult(n) }))
        .Merge(maxConcurrent: 2)
        .Do(x => AddResultToDb(x.n, x.r))
        .Wait();
}
3 голосов
/ 08 февраля 2020

asyn c и шаблон ожидания не очень подходят для вашего первого метода. Он хорошо подходит для ограниченных рабочих нагрузок ввода-вывода для достижения масштабируемости или для каркасов, которые имеют пользовательский интерфейс для отзывчивости. Он менее подходит для необработанных рабочих нагрузок процессора .

Однако вы все равно можете получить выгоду от параллельной обработки , потому что ваш первый метод дорогой и поточно-безопасный .

В следующем примере я использовал Parallel LINQ (PLINQ) для быстрого выражения результатов, не беспокоясь о массиве pre-size / одновременной коллекции / блокировка , хотя вы можете использовать другие функции TPL , например Parallel.For/ForEach

// Potentially break up the workloads in parallel
// return the number and result in a ValueTuple
var results = numbers.AsParallel()
                     .Select(x => (number: x, result: ComputeResult(x)))
                     .ToList();

// iterate through the number and results and execute them serially 
foreach (var (number, result) in results)
   AddResultToDb(number, result);

Примечание : предположение, что порядок здесь не важен


Дополнительный

Ваш метод AddResultToDb выглядит так, как будто он просто вставляется результаты в базу данных , которая IO Bound и достойна async, более того, вероятно, может взять все результаты сразу и вставить их в bulk / batch экономия поездки туда и обратно


Из комментариев кредит @ TheodorZoulias

До сохранить Для заказа вы можете использовать метод AsOrdered за счет некоторого снижения производительности. Возможное улучшение производительности - удалить ToList(), чтобы результаты добавлялись в БД одновременно с вычислениями.

Чтобы сделать результаты доступными как можно быстрее, вероятно, хорошей идеей будет отключение частичного буферизация, которая происходит по умолчанию , путем цепочки метода .WithMergeOptions(ParallelMergeOptions.NotBuffered) в запросе

var results = numbers.AsParallel()
                     .Select(x => (number: x, result: ComputeResult(x)))
                     .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                     .AsOrdered();

Пример


Дополнительные ресурсы

ParallelEnumerable.AsOrdered Method

Включает обработку источника данных, как если бы он был упорядочен, переопределяя значение по умолчанию для неупорядоченных , AsOrdered может вызываться только для неуниверсальных последовательностей

ParallelEnumerable.WithMergeOptions

Устанавливает параметры слияния для этого запроса, которые определяют, как запрос будет буферизовать вывод.

ParallelMergeOptions Enum

NotBuffered Использовать слияние без выходных буферов. Как только будут вычислены элементы результата, сделайте этот элемент доступным для потребителя запроса.

1 голос
/ 08 февраля 2020

Это на самом деле не так для async/await, потому что похоже, что ComputeResult дорого в вычислительном отношении, а не просто занимает много времени. aync/await лучше для задач, которые вы действительно ожидаете. Parallel.ForEach фактически приведет к потоке вашей рабочей нагрузки.

Если что, AddResultToDb - это то, что вы хотите асинхронизировать / ожидать - вы ожидаете завершения внешнего действия.

Хорошо в Глубинное объяснение: { ссылка }

0 голосов
/ 08 февраля 2020

Честно говоря, использование Parallel.For кажется самым простым решением, поскольку ваши вычисления, скорее всего, будут привязаны к процессору. Async / await лучше подходит для операций, связанных с вводом / выводом, поскольку для него не требуется, чтобы другой поток ожидал завершения операции ввода / вывода (см. нет потока ).

Тем не менее, вы все еще можете использовать async / await для задач, которые вы помещаете в пул потоков. Вот как ты мог это сделать.

static void AddResultToDb(int number)
{
    int result = ComputeResult(number); 
    AddResultToDb(number, result); 
}

static async Task AddResultsToDb(IEnumerable<int> numbers)
{
    var tasks = numbers.Select
    (
        number => Task.Run( () => AddResultToDb(number) )  
    )
    .ToList();

    await Task.WhenAll(tasks);
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...