Почему несколько коротких задач в итоге получают один и тот же идентификатор? - PullRequest
0 голосов
/ 17 октября 2019

У меня есть список элементов для обработки, и я создаю задачу для каждого из них, а затем жду с помощью Task.WhenAny (). Я следую описанному здесь шаблону: Запуск нескольких асинхронных задач и их обработка по мере их выполнения .

Я изменил одну вещь: я использую HashSet<Task> вместо List<Task>. Но я замечаю, что все задачи в конечном итоге получают один и тот же идентификатор, и, таким образом, HashSet добавляет только одну из них, и, следовательно, я в конечном итоге ожидаю только одну задачу.

У меня есть рабочий пример здесьв dotnetfiddle: https://dotnetfiddle.net/KQN2ow

Также вставляем следующий код:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ReproTasksWithSameId
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            List<int> itemIds = new List<int>() { 1, 2, 3, 4 };
            await ProcessManyItems(itemIds);
        }

        private static async Task ProcessManyItems(List<int> itemIds)
        {
            //
            // Create tasks for each item and then wait for them using Task.WhenAny
            // Following Task.WhenAny() pattern described here: https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/concepts/async/start-multiple-async-tasks-and-process-them-as-they-complete
            // But replaced List<Task> with HashSet<Task>.
            //

            HashSet<Task> tasks = new HashSet<Task>();

            // We map the task ids to item ids so that we have enough info to log if a task throws an exception.
            Dictionary<int, int> taskIdToItemId = new Dictionary<int, int>();

            foreach (int itemId in itemIds)
            {
                Task task = ProcessOneItem(itemId);
                Console.WriteLine("Created task with id: {0}", task.Id);
                tasks.Add(task);
                taskIdToItemId[task.Id] = itemId;
            }

            // Add a loop to process the tasks one at a time until none remain.
            while (tasks.Count > 0)
            {
                // Identify the first task that completes.
                Task task = await Task.WhenAny(tasks);

                // Remove the selected task from the list so that we don't
                // process it more than once.
                tasks.Remove(task);

                // Get the item id from our map, so that we can log rich information.
                int itemId = taskIdToItemId[task.Id];

                try
                {
                    // Await the completed task.
                    await task;  // unwrap exceptions.
                    Console.WriteLine("Successfully processed task with id: {0}, itemId: {1}", task.Id, itemId);
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Failed to process task with id: {0}, itemId: {1}. Just logging & eating the exception {1}", task.Id, itemId, ex);
                }
            }
        }
        private static async Task ProcessOneItem(int itemId)
        {
            // Assume this method awaits on some asynchronous IO.
            Console.WriteLine("item: {0}", itemId);
        }
    }
}

Вывод, который я получаю, таков:

item: 1
Created task with id: 1
item: 2
Created task with id: 1
item: 3
Created task with id: 1
item: 4
Created task with id: 1
Successfully processed task with id: 1, itemId: 4

Так что в основном программа завершается после ожиданиятолько первое задание.

  1. Почему несколько коротких заданий в итоге получают один и тот же идентификатор? Кстати, я также тестировал метод, который возвращает Task<TResult> вместо Task, и в этом случае он работает нормально.

  2. Есть ли лучший подход, который я могу использовать?

Ответы [ 3 ]

3 голосов
/ 17 октября 2019

Код вопроса является синхронным, поэтому выполняется только одно выполненное задание. async не заставляет что-то работать асинхронно, это синтаксический сахар, который позволяет использовать await для ожидания уже выполнения асинхронной операции для завершения без блокировки вызывающего потока.

Что касается примера документации, это то, что он есть. Пример документации, а не шаблон и, конечно, не то, что можно использовать в производстве, кроме простых случаев.

Что произойдет, если вы сможете сделать только 5 запросов одновременно, чтобы избежать переполнения вашей сети или процессора? Вам нужно будет загрузить только фиксированное количество записей для этого. Что если вам нужно обработать загруженные данные? Что если список URL-адресов поступает из другого потока?

Эти проблемы решаются с помощью одновременных контейнеров, шаблонов pub / sub и специально созданных классов Dataflow и Channel.

Поток данных

Старые классы Dataflow обеспечивают автоматическую буферизацию ввода-вывода и автоматическую обработку рабочих задач. Весь код загрузки можно заменить на ActionBlock :

var client=new HttpClient(....);
//Cancel if the process takes longer than 30 minutes
var cts=new CancellationTokenSource(TimeSpan.FromMinutes(30));
var options=new ExecutionDataflowBlockOptions(){
    MaxDegreeOfParallelism=10,
    BoundedCapacity=5,
    CancellationToken=cts.Token
};
var block=new ActionBlock<string>(url=>ProcessUrl(url,client,cts.Token));

Вот и все. Блок будет использовать до 10 одновременных задач для выполнения до 10 одновременных загрузок. Он будет хранить до 5 URL в памяти (иначе все буферизуется). Если входной буфер заполнится, отправка элементов в блок будет выполняться асинхронно, что предотвращает медленную загрузку из-за переполнения памяти URL-адресами.

В том же или другом потоке «издатель» URL может публиковатьмного URL-адресов по своему усмотрению, сколько угодно.

foreach(var url in urls)
{
    await block.SendAsync(url);
}
//Tell the block we're done
block.Complete();
//Wait until all downloads are complete
await block.Completion;

Мы можем использовать другие блоки, такие как TransformBlock, для получения вывода, передачи его в другой блок и, таким образом, для создания параллельного конвейера обработки. Допустим, у нас есть два метода, DownloadURL и ParseResponse вместо просто ProcessUrl:

Task<string> DownloadUrlAsync(string url,HttpClient client)
{
    return client.GetStringAsync(url);
}

void ParseResponse(string content)
{
    var object=JObject.Parse();
    DoSomethingWith(object);
}

Мы могли бы создать отдельный блок для каждого шага в конвейере с различными DOP и буферами:

var dlOptions=new ExecutionDataflowBlockOptions(){
    MaxDegreeOfParallelism=5,
    BoundedCapacity=5,
    CancellationToken=cts.Token
};
var downloader=new TransformBlock<string,string>(
                   url=>DownloadUrlAsync(url,client),
                   dlOptions);

var parseOptions = new ExecutionDataflowBlockOptions(){
    MaxDegreeOfParallelism=10,
    BoundedCapacity=2,
    CancellationToken=cts.Token
};
var parser=new ActionBlock<string>(ParseResponse);

downloader.LinkTo(parser, new DataflowLinkOptions{PropageateCompletion=true});

Мы можем публиковать URL-адреса для загрузчика и ждать, пока все они будут проанализированы. Используя разные DOP и возможности, мы можем сбалансировать количество задач загрузчика и анализатора, чтобы загрузить столько URL-адресов, сколько мы можем проанализировать и обработать, например, медленные загрузки или большие ответы.

foreach(var url in urls)
{
    await downloader.SendAsync(url);
}
//Tell the block we're done
downloader.Complete();
//Wait until all urls are parsed
await parser.Completion;

Каналы

System.Threading.Channels представляет каналы в стиле Go. Это на самом деле низкоуровневые концепции, которые блокирует поток данных. Если бы каналы были доступны еще в 2012 году, они были бы написаны с использованием каналов.

Эквивалентный метод загрузки будет выглядеть следующим образом:

ChannelReader<string> Downloader(ChannelReader<string> ulrs,HttpClient client,
                                 int capacity,CancellationToken token=default)
{
    var channel=Channel.CreateBounded(capacity);
    var writer=channel.Writer;

    _ = Task.Run(async ()=>{
        await foreach(var url in urls.ReadAsStreamAsync(token))
        {
            var response=await client.GetStringAsync(url);
            await writer.WriteAsync(response);
        }
    }).ContinueWith(t=>writer.Complete(t.Exception));
    return channel.Reader;
}

Это более многословно, но позволяет нам делать такие вещи, как создание HttpClient в методе и его повторное использование. ,Использование ChannelReader в качестве входных и выходных данных может показаться странным, но теперь мы можем связать такие методы, просто передав считыватель в качестве входных данных другому методу.

"Волшебство" заключается в том, что мы создаем рабочую задачу, которая ожидает обработки сообщений и немедленно возвращает читателя. Всякий раз, когда получается результат, он отправляется на канал и следующий шаг в конвейере.

Чтобы использовать несколько рабочих задач, мы можем использовать Enumerable.Range, чтобы запустить многие из них, и Task.WhenAny, чтобы закрытьканал, когда все каналы готовы:

ChannelReader<string> Downloader(ChannelReader<string> ulrs,HttpClient client,
                                 int capacity,int dop,CancellationToken token=default)
{
    var channel=Channel.CreateBounded(capacity);
    var writer=channel.Writer;

    var tasks  = Enumerable
                   .Range(0,dop)
                   .Select(_=> Task.Run(async ()=>{
                       await foreach(var url in urls.ReadAllAsync(token))
                       {
                           var response=await client.GetStringAsync(url);
                           await writer.WriteAsync(response);
                       }
                    });
    _=Task.WhenAll(tasks)
          .ContinueWith(t=>writer.Complete(t.Exception));
    return channel.Reader;
}

Издатели могут создать свой собственный канал и передать считыватель методу Downloader. Им также не нужно ничего публиковать заранее:

var channel=Channel.CreateUnbounded<string>();
var dlReader=Downloader(channel.Reader,client,5,5);
foreach(var url in someUrlList)
{
    await channel.Writer.WriteAsync(url);
}
channel.Writer.Complete();

Свободные конвейеры

Это так часто, что кто-то может создать метод расширения для этого. Например, чтобы преобразовать IList в Channel<T>, нам не нужно ждать, так как все результаты уже доступны:

ChannelReader<T> Generate<T>(this IEnumerable<T> source)
{
    var channel=Channel.CreateUnbounded<T>();
    foreach(var item in source)
    {
        channel.Writer.TryWrite(T);
    }
    channel.Writer.Complete();
    return channel.Reader;
}

Если мы также преобразуем Downloader в метод расширения, мыможно использовать:

var pipeline= someUrls.Generate() 
                      .Downloader(client,5,5);
2 голосов
/ 17 октября 2019

Это потому, что ProcessOneItem не является асинхронным.

Вы должны увидеть следующее предупреждение:

В этом асинхронном методе отсутствуют операторы 'await' и он будет работать синхронно. Подумайте об использовании оператора «await» для ожидания неблокирующих вызовов API или «await Task.Run (...)» для выполнения работы с процессором в фоновом потоке.

После добавления await (...) к ProcessOneItem возвращаемое задание будет иметь unique-ish id.

0 голосов
/ 17 октября 2019

Из документации свойства Task.Id:

Идентификаторы задач назначаются по запросу и не обязательно представляют порядок, в котором создаются экземпляры задач. Обратите внимание, что, хотя коллизии очень редки, идентификаторы задач не гарантируются как уникальные.

Из того, что я понимаю, это свойство в основном используется для целей отладки. Вам, вероятно, следует избегать зависимости от него для производственного кода.

...