Существует проблема с кодом, который вы показали, а именно, он не имеет разумного канала связи между работником и создателем задачи. Вам нужен какой-то механизм обмена сообщениями, чтобы уведомить работника о новых задачах (и когда задач больше нет), чтобы он мог на него реагировать. Это то, что вы должны выяснить для своей параллельной системы, и точная реализация имеет отношение к вопросу, поэтому я просто предположу, что у нас есть методы OnTaskAdded(Task task)
и OnEnd()
в нашем работнике.
Из чеговы говорите, что вы не хотите действительно ждать, пока какая-либо задача завершится, а скорее для каждой задачи выполнить что-то, когда она завершится. СМОТРИТЕ ОБНОВЛЕННЫЙ ОТВЕТ НИЖЕ. Этого можно достичь с помощью ContinueWith
:
class Worker
{
private List<Task> _tasks = new List<Task>();
private readonly Stopwatch _stopwatch = new Stopwatch();
// Start the stopwatch in the constructor or in some kind of a StartProcessing method.
void OnTaskAdded(Task<int> task)
{
var taskWithContinuation = task.ContinueWith(t =>
Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, _stopwatch.Elapsed));
_tasks.Add(taskWithContinuation);
}
async Task OnEndAsync()
{
// We're finishing work and there will be no more tasks, it's safe to await them all now.
await Task.WhenAll(_tasks);
}
}
РЕДАКТИРОВАТЬ: После всего этого морализаторского разговора о том, как обеспечить разумный конвейер обмена сообщениями, я рассчитывал, что на самом деле могу дать вам быструю и грязную реализацию простотак что вы можете видеть, что это работает:
// DISCLAIMER: NOT PRODUCTION CODE!!!
public static async Task Main()
{
Stopwatch sw = new Stopwatch(); sw.Start();
// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
var internalList = new List<Task>();
// while there are tasks to complete use the main thread to process them as they comeplete
var i = 0;
while (i < 10)
{
while (taskList.Count <= i)
{
// No new tasks, check again after a delay -- THIS IS VERY BAD!
await Task.Delay(100);
}
Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
var taskWithContinuation = taskList[i].ContinueWith(t =>
Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, sw.Elapsed));
internalList.Add(taskWithContinuation);
++i;
}
await Task.WhenAll(internalList);
}
Позвольте мне еще раз подчеркнуть: это , а не код качества производства! Активно жду новых заданий, тьфу. Его вывод выглядит примерно так:
Task 1 intercepted at: 00:00:00.0495570
Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8459622
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2626124
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2257285
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3058738
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6376981
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7507146
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7107754
Task 2 found to be completed at: 00:00:12.8111589
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7883430
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.6707959
Task 7 found to be completed at: 00:00:21.6692276
Task 4 found to be completed at: 00:00:24.2125638
Task 3 found to be completed at: 00:00:31.2276640
Task 6 found to be completed at: 00:00:31.5908324
Task 10 found to be completed at: 00:00:34.5585143
Task 9 found to be completed at: 00:00:34.7053864
Task 5 found to be completed at: 00:00:38.2616534
Task 8 found to be completed at: 00:00:40.6372696
Task 1 found to be completed at: 00:01:00.0720695
Вы можете видеть, что строки немного перетасованы из-за характера многопоточной работы, но временные метки точны.
ОБНОВЛЕНИЕ:
Ну, я довольно тупой, я только что пригласил вас на анти-паттерн. Использование ContinueWith
опасно , более того, оно слишком сложно - введено async
/ await
, чтобы освободить нас от продолжения планирования вручную. Вы можете просто обернуть Task<int>
с помощью операции, которая await
s и записывает время .
class Worker
{
private List<Task> _tasks = new List<Task>();
private readonly Stopwatch _stopwatch = new Stopwatch();
// Start the stopwatch in the constructor or in some kind of a StartProcessing method.
void OnTaskAdded(Task<int> task)
{
var taskWithContinuation = ContinueWithLog(task);
_tasks.Add(taskWithContinuation);
}
async Task OnEndAsync()
{
// We're finishing work and there will be no more tasks, it's safe to await them all now.
await Task.WhenAll(_tasks);
}
private Task ContinueWithLog(Task<int> task)
{
var i = await source;
Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
}
}
Используя ваш пример кода для быстрого и грязного PoC:
class Program
{
static readonly List<Task<int>> taskList = new List<Task<int>>();
static readonly Random rnd = new Random(1);
static readonly Stopwatch sw = new Stopwatch();
static async Task<int> RunTaskAsync(int taskID, int taskDuration)
{
await Task.Yield();
Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
await Task.Delay(taskDuration); // mimic some work
return taskID;
}
static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
{
// Add numTasks asyncronously to the taskList
// First task is added Syncronously and then we yield the adds to a worker
taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task
// remaing task run's are Yielded to a worker thread
for (int i = 2; i <= numTasks; i++)
{
await Task.Delay(rnd.Next(minDelay, maxDelay));
taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
}
}
public static async Task ContinueWithLog(Task<int> source)
{
var i = await source;
Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
}
public static async Task Main()
{
sw.Start();
// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
var internalList = new List<Task>();
// while there are tasks to complete use the main thread to process them as they comeplete
var i = 0;
while (i < 10)
{
while (taskList.Count <= i)
{
// No new tasks, check again after a delay -- THIS IS VERY BAD!
await Task.Delay(100);
}
Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
internalList.Add(ContinueWithLog(taskList[i]));
++i;
}
await Task.WhenAll(internalList);
}
}
Вывод:
Starting Task: 1 with a duration of 60 seconds
Task 1 intercepted at: 00:00:00.0525006
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8551382
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2687049
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2404507
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3325019
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6654663
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7809841
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7576237
Task 2 found to be completed at: 00:00:12.8151955
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7228579
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.5829039
Task 7 found to be completed at: 00:00:21.6848699
Task 4 found to be completed at: 00:00:24.2089671
Task 3 found to be completed at: 00:00:31.2300136
Task 6 found to be completed at: 00:00:31.5847257
Task 10 found to be completed at: 00:00:34.5550722
Task 9 found to be completed at: 00:00:34.6904076
Task 5 found to be completed at: 00:00:38.2835777
Task 8 found to be completed at: 00:00:40.6445029
Task 1 found to be completed at: 00:01:00.0826952
Это идиоматический способ добиться того, чего вы хотите. Извините, что сначала ввел вас в заблуждение, введя ContinueWith
, это ненужно и подвержено ошибкам, теперь мы оба знаем.