Улучшение параллельной работы C # - PullRequest
0 голосов
/ 24 февраля 2019

У меня 8 логических процессоров.При выполнении следующего кода

  public void test()
    {
        Parallel.For( 1, 1001, i => { IntensiveWork(i); });
    }

    private static void IntensiveWork(int i)
    {
        Random r=new Random();
        Thread.Sleep(r.Next(i*1));
    }

я замечаю, что Parallel.For выполняет несколько пакетов по 8 заданий.Каждая партия будет выполняться последовательно.Проблема заключается в том, что если 7/8 заданий в пакете завершено, то следующий пакет будет ожидать завершения последнего задания.Это означает, что 7 ядер не будут заняты.Есть ли лучший способ реализовать параллелизм - это C #, в котором, как только задание в пакете завершается, оно назначит этому ядру другое задание.

Ответы [ 2 ]

0 голосов
/ 24 февраля 2019

Попробуйте Microsoft Reactive Framework (он же Rx) - просто NuGet System.Reactive, а затем добавьте using System.Reactive.Linq; - тогда вы можете сделать следующее:

public void test()
{
    IObservable<Unit> query =
        Observable
            .Range(1, 1000)
            .SelectMany(i =>
                Observable
                    .Start(() => IntensiveWork(i)));

    IDisposable subscription = query.Subscribe();
}

private static Random r = new Random();

private static void IntensiveWork(int i)
{
    Thread.Sleep(r.Next(i * 1));
}

Играть с .Subscribe(..., чтобы иметь возможность отвечатьк каждому рабочему элементу после его завершения.

0 голосов
/ 24 февраля 2019

Вы можете создать одну очередь, из которой будет считываться несколько задач.

static void test()
{
    ConcurrentQueue<int> queue = new ConcurrentQueue<int>(Enumerable.Range(1, 1000));
    int taskCount = Environment.ProcessorCount;
    Task[] tasks = new Task[taskCount];
    for (int taskIndex = 0; taskIndex < taskCount; taskIndex++)
    {
        Task task = Task.Factory.StartNew(() => IntensiveWorkTask(queue));
        tasks[taskIndex] = task;
    }
    Task.WaitAll(tasks);
}

private static void IntensiveWorkTask(ConcurrentQueue<int> queue)
{
    while (queue.TryDequeue(out int value))
        IntensiveWork(value);
}

private static void IntensiveWork(int i)
{
    Random r = new Random();
    Thread.Sleep(r.Next(i * 1));
}
...