Подходящий шаблон для ожидания Task.WhenAny (Список <T>), когда в список могут быть добавлены другие задачи. - PullRequest
3 голосов
/ 23 декабря 2019

Невозможно дождаться List<Task>, который меняется, потому что Task.WhenAny(List<Task>) берет копию List<Task>.

Какой шаблон подходит для

List<Task> taskList = new List<Task>();

await Task.WhenAny(taskList);

Когда TaskList можетесть ли другие задачи, добавленные к нему после первого вызова метода WhenAny?

Полный демонстрационный код ниже, демонстрирующий проблему.

    static readonly List<Task<int>> taskList = new List<Task<int>>();
    static readonly Random rnd = new Random(1);

    static async Task<int> RunTaskAsync(int taskID,int taskDuration)
    {
        await Task.Yield();
        Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
        await Task.Delay(taskDuration);  // mimic some work
        return taskID;
    }
    static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
    {
        // Add numTasks asyncronously to the taskList
        // First task is added Syncronously and then we yield the adds to a worker

        taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
        await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task

        // remaing task run's are Yielded to a worker thread
        for (int i = 2; i <= numTasks; i++)
        {
            await Task.Delay(rnd.Next(minDelay, maxDelay));
            taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
        }
    }
    static async Task Main(string[] args)
    {
        Stopwatch sw = new Stopwatch(); sw.Start();

        // Start a Fire and Forget Task to create some running tasks
        var _ = AddTasksAsync(10, 1, 3000);

        // while there are tasks to complete use the main thread to process them as they comeplete
        while(taskList.Count > 0)
        {
            var t = await Task.WhenAny(taskList);
            taskList.Remove(t);
            var i = await t;
            Console.WriteLine("Task {0} found to be completed at: {1}",i,sw.Elapsed);
        }

        // All tasks have completed sucessfully - exit main thread
    }

Консольный вывод, показывающий, что цикл WhenAny () обнаружил все остальныезадачи выполнены, только после нахождения и удаления 60-секундной задачи.

Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Starting Task: 3 with a duration of 24 seconds
Starting Task: 4 with a duration of 15 seconds
Starting Task: 5 with a duration of 28 seconds
Starting Task: 6 with a duration of 21 seconds
Starting Task: 7 with a duration of 11 seconds
Starting Task: 8 with a duration of 29 seconds
Starting Task: 9 with a duration of 21 seconds
Starting Task: 10 with a duration of 20 seconds
Task 1 found to be completed at: 00:01:00.1305811
Task 2 found to be completed at: 00:01:00.1312951
Task 3 found to be completed at: 00:01:00.1315689
Task 4 found to be completed at: 00:01:00.1317623
Task 5 found to be completed at: 00:01:00.1319427
Task 6 found to be completed at: 00:01:00.1321225
Task 7 found to be completed at: 00:01:00.1323002
Task 8 found to be completed at: 00:01:00.1324379
Task 9 found to be completed at: 00:01:00.1325962
Task 10 found to be completed at: 00:01:00.1327377

Спасибо!

Ответы [ 2 ]

1 голос
/ 23 декабря 2019

Существует проблема с кодом, который вы показали, а именно, он не имеет разумного канала связи между работником и создателем задачи. Вам нужен какой-то механизм обмена сообщениями, чтобы уведомить работника о новых задачах (и когда задач больше нет), чтобы он мог на него реагировать. Это то, что вы должны выяснить для своей параллельной системы, и точная реализация имеет отношение к вопросу, поэтому я просто предположу, что у нас есть методы OnTaskAdded(Task task) и OnEnd() в нашем работнике.

Из чеговы говорите, что вы не хотите действительно ждать, пока какая-либо задача завершится, а скорее для каждой задачи выполнить что-то, когда она завершится. СМОТРИТЕ ОБНОВЛЕННЫЙ ОТВЕТ НИЖЕ. Этого можно достичь с помощью ContinueWith:

class Worker
{
    private List<Task> _tasks = new List<Task>();
    private readonly Stopwatch _stopwatch = new Stopwatch();

    // Start the stopwatch in the constructor or in some kind of a StartProcessing method.

    void OnTaskAdded(Task<int> task)
    {
        var taskWithContinuation = task.ContinueWith(t =>
            Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, _stopwatch.Elapsed));
        _tasks.Add(taskWithContinuation);
    }

    async Task OnEndAsync()
    {
        // We're finishing work and there will be no more tasks, it's safe to await them all now.
        await Task.WhenAll(_tasks);
    }
}

РЕДАКТИРОВАТЬ: После всего этого морализаторского разговора о том, как обеспечить разумный конвейер обмена сообщениями, я рассчитывал, что на самом деле могу дать вам быструю и грязную реализацию простотак что вы можете видеть, что это работает:

// DISCLAIMER: NOT PRODUCTION CODE!!!
public static async Task Main()
{
    Stopwatch sw = new Stopwatch(); sw.Start();

    // Start a Fire and Forget Task to create some running tasks
    var _ = AddTasksAsync(10, 1, 3000);
    var internalList = new List<Task>();

    // while there are tasks to complete use the main thread to process them as they comeplete
    var i = 0;
    while (i < 10)
    {
        while (taskList.Count <= i)
        {
            // No new tasks, check again after a delay -- THIS IS VERY BAD!
            await Task.Delay(100);
        }
        Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
        var taskWithContinuation = taskList[i].ContinueWith(t =>
            Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, sw.Elapsed));
        internalList.Add(taskWithContinuation);
        ++i;
    }
    await Task.WhenAll(internalList);
}

Позвольте мне еще раз подчеркнуть: это , а не код качества производства! Активно жду новых заданий, тьфу. Его вывод выглядит примерно так:

Task 1 intercepted at: 00:00:00.0495570
Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8459622
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2626124
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2257285
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3058738
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6376981
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7507146
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7107754
Task 2 found to be completed at: 00:00:12.8111589
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7883430
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.6707959
Task 7 found to be completed at: 00:00:21.6692276
Task 4 found to be completed at: 00:00:24.2125638
Task 3 found to be completed at: 00:00:31.2276640
Task 6 found to be completed at: 00:00:31.5908324
Task 10 found to be completed at: 00:00:34.5585143
Task 9 found to be completed at: 00:00:34.7053864
Task 5 found to be completed at: 00:00:38.2616534
Task 8 found to be completed at: 00:00:40.6372696
Task 1 found to be completed at: 00:01:00.0720695

Вы можете видеть, что строки немного перетасованы из-за характера многопоточной работы, но временные метки точны.

ОБНОВЛЕНИЕ:

Ну, я довольно тупой, я только что пригласил вас на анти-паттерн. Использование ContinueWith опасно , более того, оно слишком сложно - введено async / await, чтобы освободить нас от продолжения планирования вручную. Вы можете просто обернуть Task<int> с помощью операции, которая await s и записывает время .

class Worker
{
    private List<Task> _tasks = new List<Task>();
    private readonly Stopwatch _stopwatch = new Stopwatch();

    // Start the stopwatch in the constructor or in some kind of a StartProcessing method.

    void OnTaskAdded(Task<int> task)
    {
        var taskWithContinuation = ContinueWithLog(task);
        _tasks.Add(taskWithContinuation);
    }

    async Task OnEndAsync()
    {
        // We're finishing work and there will be no more tasks, it's safe to await them all now.
        await Task.WhenAll(_tasks);
    }

    private Task ContinueWithLog(Task<int> task)
    {
        var i = await source;
        Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
    }
}

Используя ваш пример кода для быстрого и грязного PoC:

class Program
{
    static readonly List<Task<int>> taskList = new List<Task<int>>();
    static readonly Random rnd = new Random(1);
    static readonly Stopwatch sw = new Stopwatch();

    static async Task<int> RunTaskAsync(int taskID, int taskDuration)
    {
        await Task.Yield();
        Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
        await Task.Delay(taskDuration);  // mimic some work
        return taskID;
    }
    static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
    {
        // Add numTasks asyncronously to the taskList
        // First task is added Syncronously and then we yield the adds to a worker

        taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
        await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task

        // remaing task run's are Yielded to a worker thread
        for (int i = 2; i <= numTasks; i++)
        {
            await Task.Delay(rnd.Next(minDelay, maxDelay));
            taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
        }
    }

    public static async Task ContinueWithLog(Task<int> source)
    {
        var i = await source;
        Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
    }

    public static async Task Main()
    {
        sw.Start();

        // Start a Fire and Forget Task to create some running tasks
        var _ = AddTasksAsync(10, 1, 3000);
        var internalList = new List<Task>();

        // while there are tasks to complete use the main thread to process them as they comeplete
        var i = 0;
        while (i < 10)
        {
            while (taskList.Count <= i)
            {
                // No new tasks, check again after a delay -- THIS IS VERY BAD!
                await Task.Delay(100);
            }
            Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
            internalList.Add(ContinueWithLog(taskList[i]));
            ++i;
        }
        await Task.WhenAll(internalList);
    }
}

Вывод:

Starting Task: 1 with a duration of 60 seconds
Task 1 intercepted at: 00:00:00.0525006
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8551382
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2687049
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2404507
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3325019
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6654663
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7809841
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7576237
Task 2 found to be completed at: 00:00:12.8151955
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7228579
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.5829039
Task 7 found to be completed at: 00:00:21.6848699
Task 4 found to be completed at: 00:00:24.2089671
Task 3 found to be completed at: 00:00:31.2300136
Task 6 found to be completed at: 00:00:31.5847257
Task 10 found to be completed at: 00:00:34.5550722
Task 9 found to be completed at: 00:00:34.6904076
Task 5 found to be completed at: 00:00:38.2835777
Task 8 found to be completed at: 00:00:40.6445029
Task 1 found to be completed at: 00:01:00.0826952

Это идиоматический способ добиться того, чего вы хотите. Извините, что сначала ввел вас в заблуждение, введя ContinueWith, это ненужно и подвержено ошибкам, теперь мы оба знаем.

0 голосов
/ 23 декабря 2019

A List<Task> не подходит для такого рода работ, поскольку не поддерживает понятие Завершение . Таким образом, вы не сможете определить, есть ли еще задачи, которые нужно добавить в список, чтобы вы могли перестать ждать. Однако существует несколько альтернатив.

  1. BlockingCollection<Task>. Производитель вызывает методы Add и, наконец, CompleteAdding, чтобы сообщить о завершении добавления задач. Потребитель просто перечисляет GetConsumingEnumerable. Очень простой, но блокирующий по своей природе (не асинхронный).
  2. BufferBlock<Task>. Производитель вызывает методы SendAsync и, наконец, Complete, чтобы сообщить о завершении добавления задач. Потребитель перечисляет асинхронно, используя методы OutputAvailableAsync и TryReceive. Требуется пакет TPL Dataflow (для .NET Framework он включен в .NET Core).
  3. Channel<Task>. Производитель вызывает методы Writer.WriteAsync и, наконец, Writer.Complete, чтобы сообщить о завершении добавления задач. Потребитель перечисляет асинхронно, используя методы Reader.WaitToReadAsync и Reader.TryRead. Требуется пакет System.Threading.Channels (для .NET Framework он включен в .NET Core).
  4. IObservable<Task> + IObserver<Task> пара. Наблюдатель подписывается на наблюдаемое, а затем начинает получать уведомления о новых задачах. Последнее уведомление - это onCompleted(), которое сигнализирует о том, что больше уведомлений не будет. Библиотека Reactive Extensions включает в себя тон методов для манипулирования наблюдаемыми, и одним из них является оператор Merge, который можно использовать для ожидания всех задач, используя тот факт, чтоTask<T> можно преобразовать в IObservable<T>, который выдает одиночное onNext уведомление. Этот подход может показаться довольно эксцентричным, и, вероятно, он не стоит вложений в изучение этой технологии (парадигмы реактивного программирования), если только вы не имеете дело с поступающими потоками данных, которые вы хотели бы фильтровать, преобразовывать, объединять и т. Д.

Обновление: В ретроспективе первые три параметра нельзя использовать как есть, поскольку вы также хотите дождаться выполнения задач. Поэтому сейчас я предлагаю использовать TransformBlock<Task, Task> вместо BufferBlock<Task>.

var block = new TransformBlock<Task, Task>(async task =>
{
    try
    {
        await task;
    }
    catch { } // suppress exceptions
    return task;
});

Пример производителя, который добавляет задачи в блок:

var producer = Task.Run(async () =>
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(100);
        Console.WriteLine($"Sending {i}");
        await block.SendAsync(Task.Delay(i * 100));
    }
    block.Complete();
});

Пример потребителя, который получает завершенные задачи из блока:

var consumer = Task.Run(async () =>
{
    while (await block.OutputAvailableAsync())
    {
        while (block.TryReceive(out var task))
        {
            Console.WriteLine($"Task Completed: {task.Status}");
        }
    }
});

Задачи принимаются в том же порядке, в котором они были добавлены в блок. Если вы хотите получить их, как только они будут завершены, настройте блок следующим образом:

new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = Int32.MaxValue,
    EnsureOrdered = false
}
...