Влияние использования AsParallel () и AsSequential () в одном запросе? C# - PullRequest
1 голос
/ 13 февраля 2020

Я просматривал PLINQ в одной из книг, где говорилось:

Если у вас сложный запрос, который может выиграть от параллельной обработки, но также есть некоторые части, которые должны выполняться последовательно, вы можно использовать AsSequential, чтобы остановить параллельную обработку вашего запроса.

Например:

var parallelResult = numbers.AsParallel().AsOrdered()
    .Where(i => i % 2 == 0).AsSequential();

Я хочу понять, почему это разрешено и как это влияет на результат? Это работает параллельно? Это работает последовательно? Сейчас это не имеет никакого смысла.

1 Ответ

1 голос
/ 15 февраля 2020

Вы можете концептуализировать запрос LINQ как конструкцию atomi c с одним планом выполнения, но может оказаться более полезным концептуализировать его как конвейер, состоящий из нескольких блоков потока данных. Выходные данные каждого блока становятся входными данными следующего блока в потоке данных, и блоки обрабатывают элементы одновременно, как только они становятся доступными. Посмотрите, например, на следующий запрос, состоящий из двух «блоков», представленных двумя операторами Select. Первый блок сконфигурирован для одновременной обработки 3 элементов (параллельно), а второй блок сконфигурирован для последовательной обработки каждого элемента. Продолжительность обработки каждого элемента составляет 1000 мсэ c для параллельного блока и 500 мсэ c для последовательного блока:

var results = Enumerable.Range(1, 10)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(3)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
            + $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
        Thread.Sleep(1000); // Simulate some CPU-bound work
        return x;
    })
    .AsSequential()
    .Select(x =>
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
            + $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
        Thread.Sleep(500); // Simulate some CPU-bound work
        return x;
    })
    .ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");

Если вы запустите этот код, вы получите такой вывод :

08:32:17.628 [4] Parallel #2
08:32:17.628 [5] Parallel #1
08:32:17.628 [6] Parallel #3
08:32:18.642 [6] Parallel #5
08:32:18.642 [5] Parallel #4
08:32:18.644 [4] Parallel #6
08:32:18.651 [1] Sequential #1
08:32:19.644 [6] Parallel #7
08:32:19.645 [4] Parallel #8
08:32:19.646 [5] Parallel #9
08:32:19.654 [1] Sequential #2
08:32:20.156 [1] Sequential #3
08:32:20.648 [4] Parallel #10
08:32:20.658 [1] Sequential #4
08:32:21.161 [1] Sequential #5
08:32:21.663 [1] Sequential #6
08:32:22.164 [1] Sequential #7
08:32:22.672 [1] Sequential #8
08:32:23.173 [1] Sequential #9
08:32:23.675 [1] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

Обратите внимание, что последовательная обработка уже запущена до того, как будет завершена вся параллельная обработка. Для достижения этого эффекта я использовал опцию конфигурации WithMergeOptions(ParallelMergeOptions.NotBuffered), чтобы минимизировать выходную буферизацию первого блока. Здесь можно найти другие варианты: ParallelMergeOptions.

Для полноты изложения давайте перепишем этот запрос, используя библиотеку TPL Dataflow . Код становится более многословным и менее гибким, но контроль выполнения становится более точным, а также становятся доступными асинхронные рабочие процессы (PLINQ не является асинхронным c -дружественным):

var block1 = new TransformBlock<int, int>(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
        + $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
    await Task.Delay(1000); // Simulate some I/O operation
    return x;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 3,
    EnsureOrdered = true // redundant since EnsureOrdered is the default
});

var block2 = new TransformBlock<int, int>(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
        + $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
    await Task.Delay(500); // Simulate some I/O operation
    return x;
}); // MaxDegreeOfParallelism = 1 is the default

block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });

// Feeding the first block
foreach (var x in Enumerable.Range(1, 10))
{
    await block1.SendAsync(x);
}
block1.Complete();

var results = new List<int>(); // Collecting the results is a bit painful
while (await block2.OutputAvailableAsync())
{
    while (block2.TryReceive(out var result))
    {
        results.Add(result);
    }
}
await block2.Completion;
Console.WriteLine($"Results: {String.Join(", ", results)}");

Вывод:

08:59:25.102 [6] Parallel #2
08:59:25.102 [4] Parallel #1
08:59:25.102 [7] Parallel #3
08:59:26.127 [7] Parallel #4
08:59:26.129 [6] Parallel #5
08:59:26.143 [4] Parallel #6
08:59:26.147 [5] Sequential #1
08:59:26.648 [5] Sequential #2
08:59:27.129 [6] Parallel #7
08:59:27.129 [7] Parallel #8
08:59:27.144 [4] Parallel #9
08:59:27.149 [5] Sequential #3
08:59:27.650 [5] Sequential #4
08:59:28.131 [6] Parallel #10
08:59:28.152 [5] Sequential #5
08:59:28.653 [5] Sequential #6
08:59:29.155 [5] Sequential #7
08:59:29.659 [5] Sequential #8
08:59:30.160 [5] Sequential #9
08:59:30.674 [5] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
...