Список процессов партиями по 1000 с использованием семафора - PullRequest
0 голосов
/ 19 января 2020

У меня есть 300 000 заказов на продажу, которые нужно отправлять партиями по 200 или 1000 / партия на вызов RestAPI и с многопоточностью, используя Semaphore и ограничение MaxDegreeOfParallelism = 8 (лучше количество ядер ЦП). Ответ каждой партии заказов необходимо добавить в общий список c. Пожалуйста, предложите любую возможность получить список ответов API для всех 300 тыс. Заказов.

  Parallel.ForEach(
    totalSalesOrdersList.Batch(1000),
    new ParallelOptions() { MaxDegreeOfParallelism = 8 /* better be number of CPU cores */ },
    batchOfSalesOrders => {
        DoMyProcessing(batchOfSalesOrders);
    });


 public static class LinqExtensions
 {
   public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(
          this IEnumerable<TSource> source, int size)
   {
    TSource[] bucket = null;
    var count = 0;

    foreach (var item in source)
    {
        if (bucket == null)
            bucket = new TSource[size];

        bucket[count++] = item;
        if (count != size)
            continue;

        yield return bucket;

        bucket = null;
        count = 0;
    }

    if (bucket != null && count > 0)
        yield return bucket.Take(count);
   }
}

Ответы [ 2 ]

1 голос
/ 19 января 2020

Выполнение большого количества одновременных удаленных вызовов API должно выполняться аккуратно, поскольку вы можете исчерпать пул соединений. Я предлагаю подход с SemaphoreSlim для выполнения регулирования и Channel для использования ответов потокобезопасным способом.

var batches = Enumerable.Range(0, 1000);
var responseCh = Channel.CreateUnbounded<string>();
var throttler = new SemaphoreSlim(10);

var requestTasks = batches.Select(async batch =>
{
    await throttler.WaitAsync();
    try
    {
        var result = await MakeHttpRequestAsync(batch);
        await responseCh.Writer.WriteAsync(result);   
    }
    finally
    {
        throttler.Release();
    }
}).ToArray();

var requestProcessing = Task.Run(async () =>
{                
    await Task.WhenAll(requestTasks);
    responseCh.Writer.Complete();
});

var responseProcessing = Task.Run(async () =>
{
    await foreach (var res in responseCh.Reader.ReadAllAsync())
        Console.WriteLine(res); // or store in a data structure
});

await Task.WhenAll(requestProcessing, responseProcessing);

Мы ограничиваем запросы, не допуская более 10 одновременно. , Мы запускаем их одновременно и в момент прибытия пишем ответ на канал. Мы обрабатываем ответы в отдельном потоке, читая их из канала. Обработка запросов и ответов происходит одновременно, поэтому мы асинхронно ожидаем завершения их обоих.

Обратите внимание, что интерфейс IAsyncEnumerable (await foreach) доступен в C# 8, и каналы поставляются с. NET Core 3.1 SDK или вы можете найти их в NuGet .

1 голос
/ 19 января 2020

Вы можете использовать Parallel Linq (AsParallel) и управлять параллелизмом с помощью метода WithDegreeOfParallelism:

var result = totalSalesOrdersList
    .Batch(1000)
    .AsParallel()
    .WithDegreeOfParallelism(8)
    .Select(batch => DoMyProcessing(batch))
    .SelectMany(batch => batch)
    .ToList();

Обычно это так не рекомендуется контролировать параллелизм операций ввода-вывода с PLINQ или классом Parallel. В настоящее время наиболее «профессиональный» способ сделать это - использовать библиотеку TPL Dataflow .

...