C# Parallel.Foreach ... Multi theading с неизвестным количеством потоков - PullRequest
0 голосов
/ 28 января 2020

У меня есть процесс syn c, который должен выполняться на каждом из моих предприятий. Количество предприятий постоянно меняется.

Я читал документы о классе Thread, Parallelism..et c ... Я не уверен, что понимаю, как это сделать, не зная / называя заранее определенное количество потоков ... в этом случае это число неизвестно. По этой причине я нашел Parallel.ForEach ... потому что я sh могу запустить неизвестное количество одновременных операций

Мои операции syn c выполняются каждые 10 минут. Каждый из них занимает до минуты или двух, чтобы бежать. Очевидно, что я не могу запустить их итеративно, потому что к тому времени, когда они завершат sh, будет запущен следующий вызов.

Я хочу запустить их одновременно в отдельных потоках. Хотя каждый из них должен иметь уникальные ключи API, они не разделяют mem или данные и не будут изменять никакие общие данные.

Для этого я провел некоторое исследование о том, как выполнять многопоточность ... I Я думаю, что Parallel.ForEach сделает свое дело ...

Мне нужна помощь с синтаксисом ...

Это в рабочей службе ... У меня есть частный метод под названием SyncBusiness(int businessId), который вызывает конечную точку API, которая синхронизирует бизнес. Легко ... просто нужна помощь с вызовом метода?

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var businessIds = (from x in _db.Poslookup
                       select x.BusinessId).Distinct();

    while (!stoppingToken.IsCancellationRequested)
    {
        // Want to multi-thread a sync for each of the businesses in businessIds
        Parallel.ForEach(businessIds, i => { 
            await SyncBusiness(i)
        });

        _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
        await Task.Delay(600000, stoppingToken);
    }
}

Также, пожалуйста, прокомментируйте любые ошибки, касающиеся масштабируемости, ограничений потоков ... и т. Д. c .... любые области, в которые я могу попасть Беда, если бы я вырос до нескольких тысяч компаний, чтобы синхронизировать c ... возможно, предложения о том, что читать о syn c операциях и масштабируемости?

Большое спасибо. Приветствия.

Ответы [ 3 ]

2 голосов
/ 28 января 2020

Как уже отмечали другие, вы не можете использовать async с Parallel.ForEach. Однако вы можете сделать асинхронный код параллельным, запустив сразу все вызовы SyncBusiness и затем используя Task.WhenAll:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  var businessIds = (from x in _db.Poslookup
                     select x.BusinessId).Distinct();

  while (!stoppingToken.IsCancellationRequested)
  {
    var tasks = businessIds.Select(SyncBusiness).ToList();
    await Task.WhenAll(tasks);

    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    await Task.Delay(600000, stoppingToken);
  }
}

Я бы также рекомендовал сделать поиск в вашей базе данных асинхронным:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  while (!stoppingToken.IsCancellationRequested)
  {
    var businessIds = await (from x in _db.Poslookup
                       select x.BusinessId).Distinct().ToListAsync();

    var tasks = businessIds.Select(SyncBusiness).ToList();
    await Task.WhenAll(tasks);

    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    await Task.Delay(600000, stoppingToken);
  }
}

И последнее замечание состоит в том, что этот текущий код синхронизирует все предприятия, и затем ждет десять минут между своей работой. Если вы хотите, чтобы он запускался каждые 10 минут, то вы можете запустить таймер в начале метода:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  while (!stoppingToken.IsCancellationRequested)
  {
    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    var timerTask = Task.Delay(TimeSpan.FromMinutes(10), stoppingToken);
    var businessIds = await (from x in _db.Poslookup
                       select x.BusinessId).Distinct().ToListAsync();

    var tasks = businessIds.Select(SyncBusiness).ToList();
    tasks.Add(timerTask);
    await Task.WhenAll(tasks);
  }
}
1 голос
/ 28 января 2020

Из официальной документации: https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-write-a-simple-parallel-foreach-loop

l oop разбивает исходную коллекцию и планирует работу над несколькими потоками на основе системной среды. Чем больше процессоров в системе, тем быстрее работает параллельный метод. Для некоторых исходных коллекций последовательное l oop может быть быстрее, в зависимости от размера источника и вида работы, которую выполняет l oop.

Вы не можете запустить их все одновременно. Параллелизм всегда ограничен процессором и (гиперпоточность тоже помогает)

Подводные камни

Еще одно замечательное руководство, объясняющее многое о подводных камнях параллельного программирования: https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/potential-pitfalls-in-data-and-task-parallelism

Нужно избегать использования кода, не поддерживающего потоки, параллель не всегда быстрее (в зависимости от ситуации) и т. Д. c

Следите за тем, чтобы не соответствовать вашим требованиям. Если потоков тысячи, а обработка не была завершена в течение 10 минут, ваш следующий пакет не запустится. Вам нужно масштабировать до нескольких машин.

0 голосов
/ 28 января 2020

Что-то вроде:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
      IEnumerable<string> businessIds = (from x in _db.Poslookup
                               select x.BusinessId).Distinct();

     // Want to multi-thread a sync for each of the businesses in businessIds
     Parallel.ForEach(businessIds, async i =>
     {
         await SyncBusiness(i, stoppingToken);
     });


    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
}

private async Task SyncBusiness(string businessId, CancellationToken stoppingToken)
{
    await new HttpClient().GetAsync($"https://example.com/endpoint/{businessId}", stoppingToken);
}

Редактировать после комментария Питера Бонса. замените

Parallel.ForEach(businessIds, async i =>
         {
             await SyncBusiness(i, stoppingToken);
         });

на

// Want to multi-thread a sync for each of the businesses in businessIds
            IEnumerable<Task> tasks = businessIds.Select(i => SyncBusiness(i, stoppingToken));

            Task.WaitAll(tasks.ToArray());
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...