Могу ли я запустить несколько медленных процессов в фоновом режиме, чтобы несколько задач могли выполняться параллельно? - PullRequest
0 голосов
/ 05 февраля 2019

У меня есть консольное приложение, написанное с использованием C# в верхней части платформы Core .NET 2.2.

Мое приложение позволяет мне запускать длительные задания администратора с помощью планировщика задач Windows.

Одно из заданий администратора выполняет вызов через веб-API, который загружает множество файлов перед их загрузкой в ​​хранилище BLOB-объектов Azure.Вот логические шаги, которые мой код должен выполнить, чтобы выполнить работу.

  1. Вызовите удаленный API, который отвечает сообщением Mime, где каждое сообщение представляет файл.
  2. Анализироватьсообщения Mime и преобразуйте каждое сообщение в MemoryStream, создавая коллекцию MemoryStream

Как только у меня будет коллекция с несколькими 1000+ MemoryStream, я хочу записать каждое Stream в AzureХранение BLOB-объектов.Поскольку запись в удаленное хранилище выполняется медленно, я надеюсь, что смогу выполнить каждую итерацию записи, используя свой собственный процесс или поток.Это позволит мне одновременно выполнять параллельно более 1000 потоков одновременно, вместо того, чтобы ждать результата каждой операции записи.Каждый поток будет отвечать за регистрацию любых ошибок, которые могут возникнуть в процессе записи / загрузки.Любые зарегистрированные ошибки будут обрабатываться с использованием другой работы, поэтому мне не нужно беспокоиться о повторных попытках.

Насколько я понимаю, вызов кода, который асинхронно записывает / передает поток, сделает именно это.Другими словами, я бы сказал, что «Stream выполняет его и запускает столько времени, сколько потребуется. Меня не волнует результат, пока задача будет выполнена».

Хотятестируя, я обнаружил, что мое понимание вызова async несколько неверно.У меня сложилось впечатление, что при вызове метода, определенного с async, он будет выполняться в фоновом потоке / работнике, пока этот процесс не будет завершен.Но мое понимание не удалось, когда я протестировал код.Мой код показал мне, что без добавления ключевого слова await код async действительно никогда не выполняется.В то же время, когда добавляется ключевое слово await, код будет ждать завершения процесса до его продолжения.Другими словами, добавление await для моей потребности потеряет цель асинхронного вызова метода.

Вот урезанная версия моего кода для объяснения того, что я пытаюсь выполнить

public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};

    foreach (Stream file in files)
    {
        // This code should get executed in the background without having to await the result
        await Upload(file);
    }
}

// This method is responsible of upload a stream to a storage and log error if any
private async Task Upload(Stream stream)
{
    try
    {
        await Storage.Create(file, GetUniqueName());
    } 
    catch(Exception e)
    {
        // Log any errors
    }
}

Исходя из приведенного выше кода, вызов await Upload(file); работает и загрузит файл, как и ожидалось.Однако, поскольку я использую await при вызове метода Upload(), мой цикл НЕ будет переходить к следующей итерации, пока код загрузки не завершится.В то же время, удаляя ключевое слово await, цикл не ожидает процесса загрузки, но на самом деле Stream никогда не записывает данные в хранилище, как будто я никогда не вызывал код.

Как выполнить несколько Upload метод параллельно, чтобы у меня был один поток, работающий на одну загрузку в фоновом режиме?

Ответы [ 5 ]

0 голосов
/ 05 февраля 2019

Вы можете преобразовать свой код в функцию Azure и позволить ей справиться с большей частью параллелизма, масштабирования и загрузки в хранилище BLOB-объектов Azure.

Вы можете использоватьТриггер Http или триггер служебной шины для запуска каждой задачи загрузки, обработки и загрузки.

0 голосов
/ 05 февраля 2019

Другие ответы хороши, однако другой подход к вашему TPL DataFlow доступен в Nuget от https://www.nuget.org/packages/System.Threading.Tasks.Dataflow/

public static async Task DoWorkLoads(List<Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 50
                     };

   var block = new ActionBlock<Something>(MyMethodAsync, options);

   foreach (var result in results)
      block.Post(result );

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(Something result)
{       
   //  Do async work here
}

Преимущество потока данных

  1. Естественно ли это работает с async, как и WhenAll решения на основе задач
  2. , его также можно подключить к более широкому набору задач
    • Вы можете повторить ошибки, отправив их обратно
    • Добавить любые вызовы предварительной обработки в более ранние блоки
  3. Вы можете ограничить MaxDegreeOfParallelism, если регулирование вызывает беспокойство
  4. Вы можете сделатьболее сложные конвейеры, отсюда и название DataFlow
0 голосов
/ 05 февраля 2019

Скорее всего, вам понадобится:

var tasks = files.Select(Upload);
await Task.WhenAll(tasks);

Просто обратите внимание, что будет создано столько задач, сколько у вас есть файлов, что может привести к сбою процесса / машины, если их будет слишком много.См. Иметь набор Задач с запущенным только X одновременно как пример того, как решить эту проблему.

0 голосов
/ 05 февраля 2019

Я надеюсь, что смогу выполнить каждую итерацию записи, используя собственный процесс или поток.

Это не самый лучший способ сделать это.Процессы и потоки являются ограниченными ресурсами.Ваш ограничивающий фактор ожидает в сети выполнения действия.

То, что вы хотите сделать, это просто что-то вроде:

var tasks = new List<Task>(queue.Count);

while (queue.Count > 0)
{
  var myobject = Queue.Dequeue();
  var task = blockBlob.UploadFromByteArrayAsync(myobject.content, 0, myobject.content.Length);
  tasks.Add(task);
}
await Task.WhenAll(tasks);

Здесь мы находимсяпросто создавая задачи так быстро, как мы можем, а затем ждите их завершения.Мы просто позволим .Net Framework позаботиться обо всем остальном.

Здесь важно то, что потоки не улучшают скорость ожидания сетевых ресурсов.Задачи - это способ делегировать то, что необходимо сделать, из рук потоков, чтобы у вас было больше потоков для выполнения чего угодно (например, запуска новой загрузки или ответа на законченную загрузку).Если поток просто ожидает завершения загрузки, это потерянный ресурс.

0 голосов
/ 05 февраля 2019

Преобразование списка в список задач «Загрузить» и ожидание их всех с помощью Task.WhenAll():

public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};
    var tasks = files.Select(Upload);

    await Task.WhenAll(tasks);
}

См. этот пост для некоторыхбольше информации о задачах / жду.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...