Parallel.ForEach не добавляет элементы, как ожидается, в ConcurrentBag в C# - PullRequest
2 голосов
/ 21 февраля 2020

В моем Asp.Net Core WebApi Controller я получаю IFormFile[] files. Мне нужно преобразовать это в List<DocumentData>. Я впервые использовал foreach. Работало нормально. Но позже решил изменить на Parallel.ForEach, так как я получаю много (> 5) файлов.

Вот мой DocumentData Класс:

public class DocumentData
{
    public byte[] BinaryData { get; set; }
    public string FileName { get; set; }
}

Вот мой Parallel.ForEach Logi c:

var documents = new ConcurrentBag<DocumentData>();
Parallel.ForEach(files, async (currentFile) =>
{
    if (currentFile.Length > 0)
    {
        using (var ms = new MemoryStream())
        {
            await currentFile.CopyToAsync(ms);
            documents.Add(new DocumentData
            {
                BinaryData = ms.ToArray(),
                FileName = currentFile.FileName
            });
        }
    }
});

Например, даже для двух файлов в качестве ввода, documents всегда дает один файл в качестве вывода. Я что-то упустил?

У меня изначально было List<DocumentData>. Я обнаружил, что это не потокобезопасно и изменил на ConcurrentBag<DocumentData>. Но все же я получаю неожиданные результаты. Пожалуйста, помогите, где я не прав?

Ответы [ 2 ]

6 голосов
/ 21 февраля 2020

Я думаю, это потому, что Parallel.Foreach не поддерживает async/await. Он принимает только Action в качестве ввода и выполняет его для каждого элемента. И в случае asyn c делегатов он выполнит их в режиме «забыл и забыл». В этом случае переданная лямбда будет рассматриваться как async void функция, и async void не может ожидаться.

Если бы имелась перегрузка, которая принимает Func<Task>, то это сработало бы.

I предлагаем вам создать Task s с помощью Select и использовать Task.WhenAll для их одновременного выполнения.

Например:

var tasks = files.Select(async currentFile =>
{
    if (currentFile.Length > 0)
    {
        using (var ms = new MemoryStream())
        {
            await currentFile.CopyToAsync(ms);
            documents.Add(new DocumentData
            {
                BinaryData = ms.ToArray(),
                FileName = currentFile.FileName
            });
        }
    }
});

await Task.WhenAll(tasks);

Дополнительно вы можете улучшить этот код просто возвращает экземпляр DocumentData из этого метода, и в этом случае нет необходимости изменять коллекцию documents. Task.WhenAll имеет перегрузку, которая принимает IEnumerable<Task<TResult> в качестве входных данных и создает Task из TResult массива. Итак, результат будет таким:

var tasks = files.Select(async currentFile =>
    {
        if (currentFile.Length > 0)
        {
            using (var ms = new MemoryStream())
            {
                await currentFile.CopyToAsync(ms);
                return new DocumentData
                {
                    BinaryData = ms.ToArray(),
                    FileName = currentFile.FileName
                };
            }
        }

        return null;
    });

var documents =  (await Task.WhenAll(tasks)).Where(d => d != null).ToArray();
3 голосов
/ 21 февраля 2020

У вас была правильная идея с одновременной коллекцией , но вы неправильно использовали метод TPL .

Короче говоря, вы должны быть очень осторожны с asyn c лямбдами , и если вы передаете их Action или Func<Task>

Ваша проблема Это связано с тем, что Parallel.For / ForEach не подходит для asyn c и шаблона ожидания или IO-связанных задач . Они подходят для связанных с процессором рабочих нагрузок . Это означает, что они по существу имеют параметры Action и давайте планировщик задач создадут задачи для вас

Если вы хотите запускать несколько задач одновременно, используйте Task.WhenAll, или TPL Dataflow ActionBlock, который может эффективно справляться как с CPU-привязанными , так и IO-ограниченными рабочими нагрузками, или, говоря более непосредственно, они могут справляться с tasks , что такое метод asyn c.

Фундаментальная проблема заключается в том, что когда вы вызываете asyn c lambda на Action, вы по существу создаете метод async void, который будет запускаться как задача незамеченным. То есть ваш метод TPL просто создает группу задач параллельно для запуска группы ненаблюдаемых задач и не ожидает их.

Подумайте об этом так, вы попросите группу друзей go и принесете вам продукты, они, в свою очередь, скажут кому-нибудь еще принести вам продукты, но ваши друзья отчитаются перед вами и скажут, что ваша работа выполнена. Это явно не так, и у вас нет продуктов.

...