Фабрика задач для каждого цикла с ожиданием - PullRequest
0 голосов
/ 31 декабря 2018

Я новичок в задачах и у меня есть вопрос относительно использования.Срабатывает ли Task.Factory для всех элементов в цикле foreach или в блоке 'await', что делает программу однопоточной?Если я думаю об этом правильно, цикл foreach запускает все задачи и .GetAwaiter (). GetResult ();блокирует основной поток до завершения последнего задания.

Также мне нужны анонимные задачи для загрузки данных.Будет ли это правильной реализацией?Я не имею в виду обработку исключений, поскольку это просто пример.

Для ясности я загружаю данные в базу данных из внешнего API.Этот использует базу данных FRED.(https://fred.stlouisfed.org/),, но у меня есть несколько, которые я нажму, чтобы завершить весь перенос (возможно, 200 тыс. Точек данных). Как только они будут сделаны, я обновляю таблицы, обновляю расчеты рынка и т. Д. Некоторые из них выполняются в режиме реального времени, а некоторые -это конец дня. Я также хотел бы сказать, что в настоящее время у меня все работает в докере, но я работал над обновлением кода, используя задачи для улучшения выполнения.

class Program
{
    private async Task SQLBulkLoader() 
    {

        foreach (var fileListObj in indicators.file_list)
        {
            await Task.Factory.StartNew(  () =>
            {

                string json = this.GET(//API call);

                SeriesObject obj = JsonConvert.DeserializeObject<SeriesObject>(json);

                DataTable dataTableConversion = ConvertToDataTable(obj.observations);
                dataTableConversion.TableName = fileListObj.series_id;

                using (SqlConnection dbConnection = new SqlConnection("SQL Connection"))
                {
                    dbConnection.Open();
                    using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
                    {
                      s.DestinationTableName = dataTableConversion.TableName;
                      foreach (var column in dataTableConversion.Columns)
                          s.ColumnMappings.Add(column.ToString(), column.ToString());
                      s.WriteToServer(dataTableConversion);
                    }

                  Console.WriteLine("File: {0} Complete", fileListObj.series_id);
                }
             });
        }            
    }

    static void Main(string[] args)
    {
        Program worker = new Program();
        worker.SQLBulkLoader().GetAwaiter().GetResult();
    }
}

Ответы [ 6 ]

0 голосов
/ 31 декабря 2018

Мое предположение: большинство трудоемких операций будут получать данные с помощью операции GET и фактический вызов WriteToServer с использованием SqlBulkCopy.Если вы посмотрите на этот класс, то увидите, что существует нативный асинхронный метод WriteToServerAsync method ( здесь ).Всегда используйте их перед созданием Задач самостоятельно, используя Task.Run.

То же самое относится и к вызову http GET.Для этого вы можете использовать собственные HttpClient.GetAsync ( документы здесь ).

Таким образом вы можете переписать свой код следующим образом:

private async Task ProcessFileAsync(string series_id)
{
    string json = await GetAsync();

    SeriesObject obj = JsonConvert.DeserializeObject<SeriesObject>(json);

    DataTable dataTableConversion = ConvertToDataTable(obj.observations);
    dataTableConversion.TableName = series_id;

    using (SqlConnection dbConnection = new SqlConnection("SQL Connection"))
    {
        dbConnection.Open();
        using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
        {
            s.DestinationTableName = dataTableConversion.TableName;
            foreach (var column in dataTableConversion.Columns)
                s.ColumnMappings.Add(column.ToString(), column.ToString());
            await s.WriteToServerAsync(dataTableConversion);
        }

        Console.WriteLine("File: {0} Complete", series_id);
    }
}

private async Task SQLBulkLoaderAsync()
{
    var tasks = indicators.file_list.Select(f => ProcessFileAsync(f.series_id));
    await Task.WhenAll(tasks);
}

Обе операции (httpвызов и вызов сервера sql) являются вызовами ввода / вывода.При использовании нативного шаблона async / await даже не будет создан или использован поток, см. этот вопрос для более подробного объяснения.Вот почему для операций, связанных с вводом-выводом, вам никогда не придется использовать Task.Run (или Task.Factory.StartNew. Но учтите, что Task.Run является рекомендуемым подходом ).

Sidenote: ifвы используете HttpClient в цикле, прочитайте это о том, как правильно его использовать.

Если вам нужно ограничить количество параллельных действий, вы также можете использовать TPLПоток данных , поскольку он очень хорошо работает с операциями, связанными с вводом-выводом на основе задач.Затем SQLBulkLoaderAsync следует изменить на (оставив метод ProcessFileAsync из предыдущего ответа без изменений):

private async Task SQLBulkLoaderAsync()
{
    var ab = new ActionBlock<string>(ProcessFileAsync, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

    foreach (var file in indicators.file_list)
    {
        ab.Post(file.series_id);
    }

    ab.Complete();
    await ab.Completion;
}
0 голосов
/ 31 декабря 2018

Это типичная проблема, которую C # 8.0 Async Streams собираются решить очень скоро.

До выхода C # 8.0 вы можете использовать библиотеку AsyncEnumarator :

using System.Collections.Async;

class Program
{
    private async Task SQLBulkLoader() {

        await indicators.file_list.ParallelForEachAsync(async fileListObj =>
        {
            ...
            await s.WriteToServerAsync(dataTableConversion);
            ...
        },
        maxDegreeOfParalellism: 3,
        cancellationToken: default);
    }

    static void Main(string[] args)
    {
        Program worker = new Program();
        worker.SQLBulkLoader().GetAwaiter().GetResult();
    }
}

Я не рекомендую использовать Parallel.ForEach и Task.WhenAll, поскольку эти функции не предназначены для асинхронной потоковой передачи.

0 голосов
/ 31 декабря 2018

Вы хотите добавить каждую задачу в коллекцию, а затем использовать Task.WhenAll для ожидания всех задач в этой коллекции:

private async Task SQLBulkLoader() 
{ 
  var tasks = new List<Task>();
  foreach (var fileListObj in indicators.file_list)
  {
    tasks.Add(Task.Factory.StartNew( () => { //Doing Stuff }));
  }

  await Task.WhenAll(tasks.ToArray());
}
0 голосов
/ 31 декабря 2018

Ваше ожидание задания, возвращенного из Task.Factory.StartNew, делает его фактически однопоточным.Вы можете увидеть простую демонстрацию этого на следующем коротком примере LinqPad:

for (var i = 0; i < 3; i++)
{
    var index = i;
    $"{index} inline".Dump();
    await Task.Run(() =>
    {
        Thread.Sleep((3 - index) * 1000);
        $"{index} in thread".Dump();
    });
}

Здесь мы меньше ждем, пока продвигаемся по циклу.Выходные данные:

0 inline
0 в потоке
1 inline
1 в потоке
2 inline
2 в потоке

Если вы удалите await перед StartNew, вы увидите, что он работает параллельно.Как уже упоминали другие, вы, безусловно, можете использовать Parallel.ForEach, но для демонстрации более ручного выполнения вы можете рассмотреть решение, подобное следующему:

var tasks = new List<Task>();

for (var i = 0; i < 3; i++) 
{
    var index = i;
    $"{index} inline".Dump();
    tasks.Add(Task.Factory.StartNew(() =>
    {
        Thread.Sleep((3 - index) * 1000);
        $"{index} in thread".Dump();
    }));
}

Task.WaitAll(tasks.ToArray());

Теперь обратите внимание на результат:

0 inline
1 inline
2 inline
2 in thread
1 in thread
0 in thread

0 голосов
/ 31 декабря 2018

Используйте цикл Parallel.ForEach, чтобы включить параллелизм данных для любого источника System.Collections.Generic.IEnumerable<T>.

// Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
    Parallel.ForEach(fileList, (currentFile) => 
    {

       //Doing Stuff

      Console.WriteLine("Processing {0} on thread {1}", currentFile, Thread.CurrentThread.ManagedThreadId);
    });
0 голосов
/ 31 декабря 2018

Почему вы не попробовали это :), эта программа не будет запускать параллельные задачи (в foreach), она будет блокировать, но логика в задаче будет выполняться в отдельном потоке от пула потоков (только один за раз, ноосновной поток будет заблокирован).

Правильный подход в вашей ситуации - использовать Paraller.ForEach Как я могу преобразовать этот код foreach в Parallel.ForEach?

...