Как распараллелить несколько разнородных заданий с определенной степенью параллелизма c - PullRequest
0 голосов
/ 27 марта 2020

Я хочу создать несколько документов, которые требуют довольно интенсивных вычислений ЦП для их создания. Документы неоднородны, некоторые в формате PDF, некоторые в Excel, а некоторые в формате Word. Мой ввод - это три последовательности идентификаторов, например:

// Input
int[] pdfIDs = new[] { 1, 2, 3 };
int[] xlsIDs = new[] { 11, 12, 13, 14 };
int[] docIDs = new[] { 21, 22 };

У меня уже есть методы, которые создают каждый из этих типов документов. Вот упрощенная версия этих методов:

PdfFile CreatePdfFile(int id)
{
    Thread.Sleep(1000); // Simulate some heavy calculation
    return new PdfFile();
}
XlsFile CreateXlsFile(int id)
{
    Thread.Sleep(1500); // Simulate some heavy calculation
    return new XlsFile();
}
DocFile CreateDocFile(int id)
{
    Thread.Sleep(2000); // Simulate some heavy calculation
    return new DocFile();
}

class PdfFile { public byte[] Bytes { get; set; } }
class XlsFile { public byte[] Bytes { get; set; } }
class DocFile { public byte[] Bytes { get; set; } }

Эти методы должны вызываться параллельно, в противном случае создание документов занимает слишком много времени. Я мог бы использовать PLINQ для параллельного создания каждого типа отдельно, но это было бы неэффективно, потому что степень параллелизма снизилась бы между завершением одного типа и началом следующего. Поэтому желательно, чтобы все документы были распараллелены, как если бы они принадлежали в одной последовательности, которая будет обрабатываться с одной настраиваемой степенью параллелизма от начала до конца sh. Желаемый результат - три массива, содержащие созданные документы:

// Output
PdfFile[] pdfFiles;
XlsFile[] xlsFiles;
DocFile[] docFiles;

Весь процесс не должен быть асинхронным. Можно блокировать текущий поток до тех пор, пока не будут созданы все документы.

Как мне достичь этой цели?

Кстати, здесь есть связанный вопрос с хорошими ответами: Ожидание нескольких Задачи с разными результатами , но этот вопрос имеет дело с более простым сценарием (нет списков, нет требования для определенной c степени параллелизма). Поэтому эти ответы не могут быть использованы для решения этой более сложной проблемы.

Ответы [ 3 ]

3 голосов
/ 28 марта 2020

Поэтому желательно, чтобы все документы были распараллелены, как если бы они принадлежали в одной последовательности, которая будет обрабатываться с одной настраиваемой степенью параллелизма от начала до конца sh.

Я обычно рекомендую использовать примитив самого высокого уровня, который возможен. В вашем случае, поскольку у вас есть разнородные действия с разными типами результатов, и вы также хотите иметь одну степень параллелизма, это ограничивает ваши параметры.

PLINQ - это вариант, хотя вам необходимо объединить входные данные и типы результатов. Что-то вроде:

int[] pdfIDs = new[] { 1, 2, 3 };
int[] xlsIDs = new[] { 11, 12, 13, 14 };
int[] docIDs = new[] { 21, 22 };

var inputs = pdfIDs.Select(id => (Type: "pdf", Id: id))
    .Concat(xlsIDs.Select(id => (Type: "xls", Id: id)))
    .Concat(docIDs.Select(id => (Type: "doc", Id: id)));
var process = inputs.AsParallel()
    .WithDegreeOfParallelism(3)
    .Select(x =>
    {
        switch (x.Type)
        {
            case "pdf": return (x.Type, File: (object) CreatePdfFile(x.Id));
            case "xls": return (x.Type, File: (object) CreateXlsFile(x.Id));
            case "doc": return (x.Type, File: (object) CreateDocFile(x.Id));
            default: throw new InvalidOperationException($"Unknown type {x.Type}");
        }
    });
var results = process.ToList();

PdfFile[] pdfFiles = results.Where(x => x.Type == "pdf").Select(x => (PdfFile) x.File).ToArray();
XlsFile[] xlsFiles = results.Where(x => x.Type == "xls").Select(x => (XlsFile)x.File).ToArray();
DocFile[] odsFiles = results.Where(x => x.Type == "doc").Select(x => (DocFile)x.File).ToArray();

Или что-то в этом роде с лучшей безопасностью типов и меньшим количеством магических c строк. Перечисление и Choice с некоторыми выражениями-переключателями сделают это лучше. :)

В качестве альтернативы, Parallel будет работать хорошо. В этом случае, возможно, Parallel.Invoke, где отдельные действия ответственны за сохранение своих собственных результатов в поточно-ориентированной коллекции:

int[] pdfIDs = new[] { 1, 2, 3 };
int[] xlsIDs = new[] { 11, 12, 13, 14 };
int[] docIDs = new[] { 21, 22 };

var pdfFileResults = new ConcurrentDictionary<int, PdfFile>();
var xlsFileResults = new ConcurrentDictionary<int, XlsFile>();
var docFileResults = new ConcurrentDictionary<int, DocFile>();

var pdfActions = pdfIDs.Select(id => (Action) (() => pdfFileResults.TryAdd(id, CreatePdfFile(id))));
var xlsActions = xlsIDs.Select(id => (Action) (() => xlsFileResults.TryAdd(id, CreateXlsFile(id))));
var docActions = docIDs.Select(id => (Action) (() => docFileResults.TryAdd(id, CreateDocFile(id))));

Parallel.Invoke(new ParallelOptions { MaxDegreeOfParallelism = 3 },
    pdfActions.Concat(xlsActions).Concat(docActions).ToArray());

PdfFile[] pdfFiles = pdfFileResults.Values.ToArray();
XlsFile[] xlsFiles = xlsFileResults.Values.ToArray();
DocFile[] odsFiles = docFileResults.Values.ToArray();

Подход PLINQ - из-за его разделения - имеет тенденцию разделять работать между различными типами файлов. Подход Parallel.Invoke имеет тенденцию проходить вниз по массиву действий по одному блоку за раз. Не уверен, что вы предпочтете.

Наконец, есть подход параллелизма на основе задач. Я вообще не рекомендую это из-за его сложности; его реальный сценарий использования - сценарий ios, где каждая задача может создавать больше задач, а не сценарий ios, подобный этому, где общее количество задач известно заранее. Поэтому я не рекомендую этот, но он интересен для полноты:

int[] pdfIDs = new[] { 1, 2, 3 };
int[] xlsIDs = new[] { 11, 12, 13, 14 };
int[] docIDs = new[] { 21, 22 };

var scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 3).ConcurrentScheduler;
var factory = new TaskFactory(scheduler);

var pdfTasks = pdfIDs.Select(id => factory.StartNew(() => CreatePdfFile(id))).ToList();
var xlsTasks = xlsIDs.Select(id => factory.StartNew(() => CreateXlsFile(id))).ToList();
var docTasks = docIDs.Select(id => factory.StartNew(() => CreateDocFile(id))).ToList();

Task.WaitAll(pdfTasks.Cast<Task>().Concat(xlsTasks).Concat(docTasks).ToArray());

PdfFile[] pdfFiles = pdfTasks.Select(x => x.Result).ToArray();
XlsFile[] xlsFiles = xlsTasks.Select(x => x.Result).ToArray();
DocFile[] odsFiles = docTasks.Select(x => x.Result).ToArray();

Поскольку это все синхронные задачи, я бы использовал ConcurrentExclusiveSchedulerPair.ConcurrentScheduler вместо SemaphoreSlim. Это обычный шаблон для регулирования параллельного кода на основе задач.

Параллельный подход на основе задач имеет выполнение, аналогичное Parallel.Invoke; поскольку все задачи ставятся в очередь в планировщике по группам по типу, они, как правило, выполняются.

В качестве заключительного замечания я должен вставить плагин для моей книги ; Я искренне думаю, что вам понравится. Мой блог посвящен асинхронности; моя книга также описывает параллелизм.

2 голосов
/ 27 марта 2020

Если ваш метод полностью синхронизирован c - между ними не будет падений, задача извлечет поток из пула, выполнит синхронизацию c часть до завершения и вернет поток в пул, вот и все. Без перерывов. Это будет только для вашего syn c неполного рабочего дня (переключение может произойти только в ожидании). Для кодирования некоторой параллельной степени я обычно использую семафор:

class Test
{
    public PdfFile CreatePdfFile(int id)
    {
        Work(1000); // Simulate some heavy calculation
        return new PdfFile();
    }
    public XlsFile CreateXlsFile(int id)
    {
        Work(1500); // Simulate some heavy calculation
        return new XlsFile();
    }
    public DocFile CreateDocFile(int id)
    {
        Work(2000); // Simulate some heavy calculation
        return new DocFile();
    }

    private void Work(int miliseconds)
    {
        var step = 100;
        while (miliseconds > 0)
        {
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(step);
            miliseconds -= step;
        }
    }

    public class PdfFile { public byte[] Bytes { get; set; } }
    public class XlsFile { public byte[] Bytes { get; set; } }
    public class DocFile { public byte[] Bytes { get; set; } }
}
class Program
{
    static void Main(string[] args)
    {
        var cts = new CancellationTokenSource();
        var degreeOfParallelism = 1;
        var itemCount = 10;

        var test = new Test();
        var sema = new SemaphoreSlim(degreeOfParallelism);

        var tasks = Enumerable.Range(0, itemCount).Select(x =>
         {
             return Task.Run(async () =>
             {
                 await sema.WaitAsync(cts.Token);
                 try
                 {   //<---- here your "exclusive thread" starts
                     if (x % 3 == 0)
                     {
                         test.CreateDocFile(x);
                     }
                     else if (x % 3 == 1)
                     {
                         test.CreatePdfFile(x);
                     }
                     else if (x % 3 == 2)
                     {
                         test.CreateXlsFile(x);
                     }
                 }
                 finally
                 {   //<---- here your "exclusive thread" ends.
                     sema.Release();
                 }
             }, cts.Token);
         }).ToArray();

        Task.WaitAll(tasks);
        Console.WriteLine("Done");
        Console.ReadKey();
    }
}

Это не так очевидно, как создание собственного пула потоков, но ведет себя так же.

Если вы перемещаете семафор в Выберите метод, вы получите более четкое представление, потому что он не будет ставить задачи в пул, а блокировать, пока не сможет.

1 голос
/ 29 марта 2020

Я хотел бы добавить еще одно решение проблемы. Это не особенно заманчиво, поскольку в нем используется конструктор anachronisti c Task, но он предлагает иногда желаемую функцию: быстро завершается неудачей в случае исключения. Степень параллелизма обеспечивается ActionBlock, который запускает и ожидает каждую задачу.

var pdfTasks = pdfIDs.Select(id => new Task<PdfFile>(() => CreatePdfFile(id))).ToArray();
var xlsTasks = xlsIDs.Select(id => new Task<XlsFile>(() => CreateXlsFile(id))).ToArray();
var docTasks = docIDs.Select(id => new Task<DocFile>(() => CreateDocFile(id))).ToArray();

var allTasks = Enumerable.Empty<Task>()
    .Concat(pdfTasks)
    .Concat(xlsTasks)
    .Concat(docTasks);

var block = new ActionBlock<Task>(async task =>
{
    task.Start();
    await task;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 3
});

foreach (var task in allTasks)
{
    block.Post(task);
}
block.Complete();
block.Completion.Wait();

PdfFile[] pdfFiles = pdfTasks.Select(t => t.Result).ToArray();
XlsFile[] xlsFiles = xlsTasks.Select(t => t.Result).ToArray();
DocFile[] docFiles = docTasks.Select(t => t.Result).ToArray();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...