Parallel.Foreach с общим абсолютным индексом для всех параллельных процессов - PullRequest
0 голосов
/ 26 мая 2020

Я запускаю процесс, который, например, работает с 10 файлами одновременно. Мне нужно назначить серийный номер на основе порядка ввода массива входных файлов. Итак, для каждого параллельного процесса мне нужно назначить порядковые номера, которые будут использоваться в параллельном процессе, в том же порядке, что и массив входных строк myFiles. Нужен ли мне какой-нибудь потокобезопасный или параллельный тип int? Какой правильный подход?

var results = new ConcurrentQueue<string>();
var options = new ParallelOptions
    { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
int startSerialNumber = 1;
if (runParallel)
{
    Parallel.ForEach(myFiles, options, (myFile) =>
    {
        var newMyFile = WorkOnMyFile(myFile,startSerialNumber);
        startSerialNumber += SubFileCount; // <--This needs to be shared
            // for all parallel processes where how do I control incrementing?
        results.Enqueue(RunExeTask(newMyFile, outputDirectory,false));
    });
}

Ответы [ 3 ]

2 голосов
/ 26 мая 2020

Генерировать серийные номера вне параллельной обработки. Увеличение числа тривиально, поэтому вам не нужно делать это в нескольких потоках. По мере их создания соедините их с элементами в вашем списке, чтобы создать новый список, содержащий и то, и другое, а затем выполните итерацию по нему.

var myData = myFiles
    .Select
    (
        (f, i) => new { File = f, SerialNumber = startingSerialNuber + (i * SubFileCount) }
    )
    .ToList();
Parallel.ForEach(myData, options, (myItem) =>
{
    myFile = myItem.File;
    serialNumber = myItem.SerialNumber;
    var newMyFile = WorkOnMyFile(myFile,serialNumber);
    results.Enqueue(RunExeTask(newMyFile, outputDirectory,false));
});
0 голосов
/ 27 мая 2020

Я предлагаю использовать PLINQ вместо класса Parallel, потому что первый по своей сути способен собирать потокобезопасные обработанные результаты и возвращать их (необязательно) в исходном порядке. Это также упрощает получение индекса текущего обрабатываемого элемента с помощью перегрузки Select, которая принимает индекс:

public static ParallelQuery<TResult> Select<TSource, TResult> (
    this ParallelQuery<TSource> source,
    Func<TSource, int, TResult> selector);

Пример использования:

string[] results = myFiles
    .AsParallel()
    .AsOrdered() // Optional, by default the original order will not be preserved
    .WithDegreeOfParallelism(runParallel ? Environment.ProcessorCount : 1)
    .Select((myFile, index) =>
    {
        var newMyFile = WorkOnMyFile(myFile, index);
        return RunExeTask(newMyFile, outputDirectory, false);
    }).ToArray();
0 голосов
/ 26 мая 2020

Вы можете попробовать что-то вроде этого:

    var results = new ConcurrentQueue<string>();
    var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
    int startSerialNumber = 1;
    if (runParallel)
    {
        new Thread(() =>
        {
            Task.Run(() =>
            {
                Parallel.ForEach(myFiles, options, (myFile) =>
                {
                    var newMyFile = WorkOnMyFile(myFile, startSerialNumber);
                    Interlocked.Add(ref startSerialNumber, SubFileCount);
                results.Enqueue(RunExeTask(newMyFile, outputDirectory, false));
                });
            }).Wait();
        }).Start();
    }

По сути, это запускает вашу операцию в фоновом потоке, который ожидает завершения задачи, прежде чем продолжить. Пока вы не ссылаетесь на startSerialNumber где-либо еще, все будет в порядке.

...