Оптимизация времени параллельных длительных задач - PullRequest
0 голосов
/ 08 января 2019

Введение

Я работаю со сложной внешней библиотекой, где я пытаюсь выполнить ее функции в большом списке элементов. Библиотека не предоставляет хорошего асинхронного интерфейса, поэтому я застрял в некотором старомодном коде.

Моя цель состоит в том, чтобы оптимизировать время, необходимое для выполнения пакета обработки, и продемонстрировать проблему, не добавляя фактическую стороннюю библиотеку. Я создал приближение к проблеме ниже

Проблема

Учитывая не асинхронное действие, где вы можете заранее знать «размер» (то есть, сложность) действия:

public interface IAction
{
    int Size { get; }
    void Execute();
}

И учитывая, есть 3 варианта этого действия:

public class LongAction : IAction
{
    public int Size => 10000;
    public void Execute()
    {
        Thread.Sleep(10000);
    }
}

public class MediumAction : IAction
{

    public int Size => 1000;
    public void Execute()
    {
        Thread.Sleep(1000);
    }
}

public class ShortAction : IAction
{
    public int Size => 100;
    public void Execute()
    {
        Thread.Sleep(100);
    }
}

Как можно оптимизировать длинный список этих действий, чтобы при параллельном запуске весь пакет выполнялся максимально быстро?

Наивно, вы могли бы просто бросить весь лот на Parallel.ForEach, с достаточно высоким параллелизмом, и это, безусловно, работает - но должен быть способ оптимально запланировать их, чтобы некоторые из самых больших запускались первыми.

Для дальнейшей иллюстрации проблемы, если мы возьмем супер упрощенный пример

  • 1 задание размером 10
  • 5 заданий размером 2
  • 10 заданий размером 1

И 2 доступных темы. Я мог бы предложить 2 (из многих) способов планирования этих задач (черная полоса - время простоя - ничего не планировать):

enter image description here

Очевидно, что первый завершается раньше, чем второй.

Минимальный полный и проверяемый код

Весь тестовый код, если кому-то нравится bash (попробуйте сделать его быстрее, чем моя наивная реализация ниже):

class Program
{
    static void Main(string[] args)
    {
        MainAsync().GetAwaiter().GetResult();
        Console.ReadLine();
    }

    static async Task MainAsync()
    {
        var list = new List<IAction>();
        for (var i = 0; i < 200; i++) list.Add(new LongAction());
        for (var i = 0; i < 200; i++) list.Add(new MediumAction());
        for (var i = 0; i < 200; i++) list.Add(new ShortAction());


        var swSync = Stopwatch.StartNew();
        Parallel.ForEach(list, new ParallelOptions { MaxDegreeOfParallelism = 20 }, action =>
        {
            Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Starting action {action.GetType().Name} on thread {Thread.CurrentThread.ManagedThreadId}");
            var sw = Stopwatch.StartNew();
            action.Execute();
            sw.Stop();
            Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Finished action {action.GetType().Name} in {sw.ElapsedMilliseconds}ms on thread {Thread.CurrentThread.ManagedThreadId}");
        });
        swSync.Stop();
        Console.WriteLine($"Done in {swSync.ElapsedMilliseconds}ms");
    }
}


public interface IAction
{
    int Size { get; }
    void Execute();
}

public class LongAction : IAction
{
    public int Size => 10000;
    public void Execute()
    {
        Thread.Sleep(10000);
    }
}

public class MediumAction : IAction
{

    public int Size => 1000;
    public void Execute()
    {
        Thread.Sleep(1000);
    }
}

public class ShortAction : IAction
{
    public int Size => 100;
    public void Execute()
    {
        Thread.Sleep(100);
    }
}

Ответы [ 4 ]

0 голосов
/ 08 января 2019

Относительно быстрое и грязное решение - использовать разделитель балансировки нагрузки поверх списка действий, отсортированного по убыванию размера

var sorted = list.OrderByDescending(a => a.Size).ToArray();
var partitioner=Partitioner.Create(sorted, loadBalance:true);

Parallel.ForEach(partitioner, options, action =>...);

Использование только этих двух строк повышает производительность примерно на 30%, как и другие ответы.

PLINQ разделяет данные и использует отдельные задачи для обработки всего раздела за один раз. Когда размер ввода известен, как в случае с массивами и списками, полученными из IList, вход разбивается на порции одинакового размера и подается в каждую рабочую задачу.

Когда размер неизвестен, как в случае с методами итераторов, запросами LINQ и т. Д., PLINQ использует разбиение на фрагменты. Блок данных извлекается за раз и передается рабочим задачам.

Другая опция, о которой я забыл, это балансировка нагрузки при разбиении верхнего чанка. Это применяет разбиение фрагментов с использованием небольших фрагментов на массивы и входные данные, полученные из IList. Балансировка нагрузки Partitioner.Create перегрузки возвращают экземпляры OrderablePartitioner, поэтому порядок элементов IAction сохраняется

То же самое можно сделать с источником IEnumerable<T>, указав параметр EnumerablePartitionerOptions.NoBuffering:

var sorted = list.OrderByDescending(a => a.Size);
var partitioner=Partitioner.Create(sorted,EnumerablePartitionerOptions.NoBuffering);

Это создаст OrderablePartitioner, который использует кодировку чанка

0 голосов
/ 08 января 2019

Ну, это зависит. На моем оборудовании ваш надуманный пример (измененный таким образом, что сны составляют 1000, 100 и 10 мс, поскольку у меня нет всего дня) будет на ~ 30% быстрее (~ 15 с против ~ 22 с), если я просто изменю цикл для запуска всех длинных задач первый:

Parallel.ForEach(list.OrderByDescending(l=>l.Size), action => ...

Но, конечно, это зависит полностью от нагрузки этих задач. Если две разные задачи интенсивно используют один и тот же ресурс (например, совместно используемую базу данных), выигрыш от параллельного выполнения этих двух задач может быть очень ограниченным, поскольку они просто блокируют друг друга на периоды на некотором уровне.

Я бы предложил вам провести более глубокий анализ, а затем как-то сгруппировать задачи, исходя из того, насколько они «параллельны», они основаны на том, что они на самом деле делают, и постараться обеспечить выполнение как можно большего количества параллельных потоков. с максимально возможным количеством «совместимых» задач ... И, конечно, если одна конкретная задача, кажется, всегда занимает столько же времени, сколько все остальные вместе взятые, убедитесь, что она запускается первой ....

Очень трудно дать лучший совет с деталями, приведенными здесь.

0 голосов
/ 08 января 2019

Сортировка по размеру задачи в порядке убывания, а затем использование TaskFactory для выполнения каждой задачи в отдельности, сэкономившее значительное время работы. Уровень параллелизма остался 20. Результаты: 114 676 мс против 193 713 мс в исходном образце. (Улучшение ~ 40%)

РЕДАКТИРОВАТЬ: В вашем конкретном примере список сортируется с самого начала, но Parallel.ForEach не сохраняет порядок ввода.

static async Task MainAsync()
{
    var list = new List<IAction>();
    for (var i = 0; i < 200; i++) list.Add(new LongAction());
    for (var i = 0; i < 200; i++) list.Add(new MediumAction());
    for (var i = 0; i < 200; i++) list.Add(new ShortAction());

    Console.WriteLine("Sorting...");
    list.Sort((x, y) => y.Size.CompareTo(x.Size));
    int totalTasks = 0;

    int degreeOfParallelism = 20;
    var swSync = Stopwatch.StartNew();
    using (SemaphoreSlim semaphore = new SemaphoreSlim(degreeOfParallelism))
    {
        foreach (IAction action in list)
        {
            semaphore.Wait();
            Task.Factory.StartNew(() =>
            {
                try
                {
                    Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Starting action {action.GetType().Name} on thread {Thread.CurrentThread.ManagedThreadId}");
                    var sw = Stopwatch.StartNew();
                    action.Execute();
                    sw.Stop();
                    Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Finished action {action.GetType().Name} in {sw.ElapsedMilliseconds}ms on thread {Thread.CurrentThread.ManagedThreadId}");
                }
                finally
                {
                    totalTasks++;
                    semaphore.Release();
                }
            });
        }

        // Wait for remaining tasks....
        while (semaphore.CurrentCount < 20)
        { }

        swSync.Stop();
        Console.WriteLine($"Done in {swSync.ElapsedMilliseconds}ms");
        Console.WriteLine("Performed total tasks: " + totalTasks);
    }
}
0 голосов
/ 08 января 2019

Сразу скажу, что проблема в следующем.

У вас есть список целых чисел и ограниченное количество лет. Вам нужен алгоритм, который суммирует целые числа в летние значения, чтобы максимальное значение летних значений было минимально возможным.

например:.

list = 1, 4, 10, 2, 3, 4
summers = 3

summer(1): 10
summer(2): 4 + 4
summer(3): 3 + 2 + 1

Как видите, ограничивающим фактором является самая продолжительная задача. Более короткие из них легко подаются параллельно или за меньшее время. Это похоже на рюкзак, но в итоге сводится к очень простому «самому длинному первому» порядку задач.

Псевдокод (с моими изобретенными классами) будет:

while (taskManager.HasTasks())
{
    task = taskManager.GetLongestTask();
    thread = threadManager.GetFreeThread(); // blocks if no thread available
    thread.Run(task);
}

Это просто псевдокод, а не параллельный / асинхронный и блоки. Я надеюсь, что вы можете извлечь из этого что-то полезное.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...