Как запустить параллельные пакеты из пропущенного списка? - PullRequest
0 голосов
/ 01 апреля 2020

У меня есть список заказов, в которых я пытаюсь обработать 2000 одновременно. То, что я хочу сделать, это обработать каждую из 2000 партий заказов одновременно, и как только все будут закончены, вернитесь. В приведенном ниже примере кода я беру список заказов и отправляю партию в CreateOrders, когда добавляю ее к завершенным заказам, так как мне нужно, чтобы все они возвращались обратно. Как я могу параллельно обрабатывать эти партии 2000 в этом случае?

public List<Order> BatchOrders(List<Order> orders)
        {
            var completedOrders = new List<Order>();
            int batchSize = 2000;

            //Create orders in batch
            for (int i = 0; i < orders.Count(); i += batchSize )
            {
                var batchOrders = orders.Skip(i).Take(batchSize).ToList();
                completedOrders.AddRange(CreateOrders(batchOrders));
            }

            return completedOrders;
        }

Ответы [ 4 ]

0 голосов
/ 02 апреля 2020

Вот как это можно сделать с помощью библиотеки TPL Dataflow . Вам потребуется два блока: один BatchBlock для выполнения пакетной обработки и один TransformManyBlock для обработки каждой партии.

List<Order> BatchOrders(List<Order> orders,
    CancellationToken cancellationToken = default)
{
    var batchBlock = new BatchBlock<Order>(batchSize: 2000);

    var tranformBlock = new TransformManyBlock<Order[], Order>(batch =>
    {
        return CreateOrders(batch.ToList());
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 5,
        CancellationToken = cancellationToken
    });

    batchBlock.LinkTo(tranformBlock,
        new DataflowLinkOptions() { PropagateCompletion = true });

    foreach (var order in orders)
    {
        batchBlock.Post(order);
    }
    batchBlock.Complete();

    var results = new List<Order>();
    while (tranformBlock.OutputAvailableAsync().Result)
    {
        while (tranformBlock.TryReceive(out var order))
        {
            results.Add(order);
        }
    }
    tranformBlock.Completion.GetAwaiter().GetResult();
    return results;
}

Эта библиотека встроена в . NET Core. Преимущества:

  1. Встроенная поддержка настройки максимальной степени параллелизма
  2. Встроенная поддержка отмены
  3. Быстро завершается неудачей в случае исключения ( в отличие от Task.WaitAll например)
  4. Может обрабатывать более сложные сценарии ios
  5. Может быть относительно легко преобразован в асинхронный c

И недостатки :

  1. Менее знаком, чем альтернативы
  2. Извлечение результатов из последнего блока несколько громоздко
  3. Не встроено. NET Framework (требуется пакет установка)
0 голосов
/ 01 апреля 2020

Как то так?

public List<Order> BatchOrders(List<Order> orders)
{        
    int batchSize = 2000;

    List <Task<IEnumerable<Order>>> tasks = new List<Task<IEnumerable<Order>>>();

    //Create orders in batch
    for (int i = 0; i < orders.Count; i += batchSize)
    {
        var batchOrders = orders.Skip(i).Take(batchSize).ToList();

        // Run CreateOrders as a task and store the task
        tasks.Add(Task.Run(() => CreateOrders(batchOrders)));
    }

    var allTasks = tasks.ToArray();

    // Wait till all the tasks are complete
    Task.WaitAll(allTasks);

    var completedOrders = new List<Order>();

    //Merge the results
    foreach (var task in allTasks)
        completedOrders.AddRange(task.Result);

    return completedOrders;
}
0 голосов
/ 01 апреля 2020

Я думаю, что ответы, которые уже предоставлены, великолепны!

Я хотел бы поделиться своими, поскольку я люблю декларативный и краткий код.

Обратите внимание, что хотя этот код имеет лучшая читаемость , у него худшая эффективность времени выполнения , поскольку исходная коллекция перечисляется столько раз, сколько существует пакетов.

Методы расширения определены на IReadonlyCollection<T>, а не на IEnumerable<T> чтобы исключить лениво вычисленные перечисления, которые загружают данные из внешних источников данных.

Во-первых, некоторые помощники по расширению

public static class BatchExtensions
{
    public static IEnumerable<T> TakePart<T>(this IReadOnlyCollection<T> data, int batchNumber, int batchSize) =>
        data
        .Skip(batchNumber * batchSize)
        .Take(batchSize);

    public static IEnumerable<IEnumerable<T>> Batch<T>(this IReadOnlyCollection<T> data, int batchSize) =>
        Enumerable
        .Range(0, data.Count / batchSize)
        .Select(index => TakePart(data, index, batchSize));
}

Теперь ваш метод довольно короткий и лаконичный

    async Task<IEnumerable<CreatedOrder>> BatchOrders(List<Order> orders, int batchSize)
    {
        var batches = 
            orders
            .Batch(batchSize)
            .Select(batch => Task.Run(() => CreateOrders(batch)))
            .ToArray();

        var result = (await Task.WhenAll(batches)).SelectMany(x=>x);

        return result;
    }
0 голосов
/ 01 апреля 2020

Попробуйте вспомогательный метод, подобный следующему:

public static class BatchExtensions
{
    public static IEnumerable<List<T>> Batch<T>(this IEnumerable<T> col, int batchSize = 2000)
    {
        var batch = new List<T>(batchSize);
        foreach (var o in col)
        {
            batch.Add(o);
            if (batch.Count == batchSize)
            {
                var rc = batch;
                batch = new List<T>(batchSize);
                yield return rc;
            }
        }
        if (batch.Count > 0)
        {
            yield return batch;
        }
    }
}

Затем для параллельной обработки каждой партии, например:

public List<Order> BatchOrders(List<Order> orders)
{
    var completedOrderBatches = new ConcurrentBag<List<Order>>();

    var opts = new ParallelOptions() { MaxDegreeOfParallelism = 4 };

    Parallel.ForEach(orders.Batch(),opts,  
        batch =>
           {
               completedOrderBatches.Add(CreateOrders(batch));
           });

    return completedOrderBatches.SelectMany(c => c).ToList();
}

Если вам нужен номер партии в CreateOrders, тогда вы можно использовать Parallel.For вместо Paralle.ForEach. Точно так же, как for вместо foreach.

public List<Order> BatchOrders(List<Order> orders)
{
    var completedOrders = new ConcurrentBag<List<Order>>();

    var opts = new ParallelOptions() { MaxDegreeOfParallelism = 4 };

    int batchSize = 2000;
    var batches = orders.Batch(batchSize).ToList();
    Parallel.For(0,batches.Count,opts, 
        batchNum =>
           {
               var batch = batches[batchNum];
               var startId = batchNum * batchSize;
               completedOrders.Add(CreateOrders(batch, startId));
           });

    return completedOrders.SelectMany(c => c).ToList();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...