C #: ограничить максимум одновременной работы с Parallel.ForEach и асинхронным действием - PullRequest
0 голосов
/ 14 сентября 2018

Я пытаюсь реализовать самодостаточный веб-сервис, используя ядро ​​asp.net 2.1, и столкнулся с проблемой реализации фоновых задач долговременного выполнения.

Из-за высокой загрузки процессора и сокращения времениДля каждого метода ProcessSingle (в фрагменте кода ниже) я хотел бы ограничить количество одновременных задач.Но, как я вижу, все задачи в Parallel.ForEach запускаются практически сразу, несмотря на то, что я установил MaxDegreeOfParallelism = 3

Мой код (это упрощенная версия):

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Factory.StartNew(async () => {
        Parallel.ForEach(
            listOfData,
            new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
            async x => await ProcessSingle(x));
    });

    // return created id immediately
    return id;
}

public static async Task ProcessSingle(MyInputData inputData)
{
    var dbData = await GetDataFromDb(); // get data from DB async using Dapper
    // some lasting processing (sync)
    await SaveDataToDb(); // async save processed data to DB using Dapper
}

ЕслиЯ правильно понимаю, проблема в async x => await ProcessSingle(x) внутри Parallel.ForEach, не так ли?

Может кто-нибудь описать, пожалуйста, как это должно быть реализовано правильно?

Обновление

Из-за некоторой двусмысленности в моем вопросе необходимо сосредоточиться на основных аспектах:

  1. В ProcessSingle есть три частиметод:

    • получение данных из асинхронной БД

    • выполнение длительных математических вычислений с высокой загрузкой ЦП

    • сохранить результаты в асинхронной БД

  2. Проблема состоит из двух отдельных:

    • Как уменьшить использование ЦП(например, запустив не более трех математических вычислений одновременно)?

    • Как сохранить структуру метода ProcessSingle - сохранить them async из-за асинхронных вызовов БД.

Надеюсь, это будет более понятно сейчас.

PS Подходящий ответ уже дан, этоработает (особенно благодаря @MatrixTai).Это обновление было написано для общего разъяснения.

Ответы [ 2 ]

0 голосов
/ 14 сентября 2018

Если вы более знакомы с «традиционной» концепцией параллельной обработки, перепишите ваш метод ProcessSingle () следующим образом:

public static void ProcessSingle(MyInputData inputData)
{
    var dbData = GetDataFromDb(); // get data from DB async using Dapper
    // some lasting processing (sync)
    SaveDataToDb(); // async save processed data to DB using Dapper
}

Конечно, вы бы также предпочтительно изменили метод Work () аналогичным образом.

0 голосов
/ 14 сентября 2018

Обновление

Как я только что заметил, вы упомянули в комментарии, что проблема вызвана математическим вычислением.

Будет лучше отделить часть вычисления иОбновление БД.

Для расчетной части используйте Parallel.ForEach(), чтобы оптимизировать свою работу, и вы можете контролировать номер потока.

И только после того, как все эти задачи завершены.Используйте async-await для обновления ваших данных в БД без SemaphoreSlim, о котором я упоминал.

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Run(async () => {

        //Calculation Part
        ConcurrentBag<int> data = new ConcurrentBag<int>();
        Parallel.ForEach(
            listOfData,
            new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
            x => {ConcurrentBag.Add(calculationPart(x))});

        //Update DB part
        int[] data_arr = data.ToArray();
        List<Task> worker = new List<Task>();
        foreach (var i in data_arr)
        {
            worker.Add(DBPart(x));
        }
        await Task.WhenAll(worker);
    });

    // return created id immediately
    return id;
}

Убедитесь, что все они начинаются вместе, так как вы используете async-await в Parallel.forEach.

Сначала прочитайте об этом вопросе для 1-го и 2-го ответов.Объединять эти два смысла бессмысленно.

На самом деле async-await максимизирует использование доступного потока, поэтому просто используйте его.

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Run(async () => {
        List<Task> worker = new List<Task>();
        foreach (var i in listOfData)
        {
            worker.Add(ProcessSingle(x));
        }
        await Task.WhenAll(worker);
    });

    // return created id immediately
    return id;
}

Но тогда возникает другая проблема, в этом случае эти задачи все еще начинаются все вместе, съедаяиспользование вашего процессора.

Чтобы избежать этого, используйте SemaphoreSlim

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Run(async () => {
        List<Task> worker = new List<Task>();
        //To limit the number of Task started.
        var throttler = new SemaphoreSlim(initialCount: 20);
        foreach (var i in listOfData)
        {
            await throttler.WaitAsync();
            worker.Add(Task.Run(async () =>
            {
                await ProcessSingle(x);
                throttler.Release();
            }
            ));
        }
        await Task.WhenAll(worker);
    });

    // return created id immediately
    return id;
}

Подробнее Как ограничить количество одновременных операций асинхронного ввода-вывода? .

Кроме того, не используйте Task.Factory.StartNew(), когда простого Task.Run() достаточно для выполнения желаемой работы, прочитайте эту превосходную статью Стивена Клири.

...