Как реализовать эффективный WhenEach, который передает IAsyncEnumerable результатов задачи? - PullRequest
3 голосов
/ 02 октября 2019

Я пытаюсь обновить свой набор инструментов новыми инструментами, предлагаемыми C # 8 , и один метод, который кажется особенно полезным, - это версия Task.WhenAll, которая возвращает IAsyncEnumerable. Этот метод должен транслировать результаты задачи, как только они станут доступны, поэтому присвоение ему имени WhenAll не имеет особого смысла. WhenEach звучит более уместно. Сигнатура метода:

public static IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks);

Этот метод можно использовать следующим образом:

var tasks = new Task<int>[]
{
    ProcessAsync(1, 300),
    ProcessAsync(2, 500),
    ProcessAsync(3, 400),
    ProcessAsync(4, 200),
    ProcessAsync(5, 100),
};

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

static async Task<int> ProcessAsync(int result, int delay)
{
    await Task.Delay(delay);
    return result;
}

Ожидаемый результат:

Обработано: 5
Обработано: 4
Обработано: 1
Обработано: 3
Обработано: 2

Мне удалось написать базовую реализацию, используя метод Task.WhenAnyв цикле, но есть проблема с этим подходом:

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)
{
    var hashSet = new HashSet<Task<TResult>>(tasks);
    while (hashSet.Count > 0)
    {
        var task = await Task.WhenAny(hashSet).ConfigureAwait(false);
        yield return await task.ConfigureAwait(false);
        hashSet.Remove(task);
    }
}

Проблема заключается в производительности. Реализация Task.WhenAny создает защитную копию предоставленного списка задач, поэтому повторный вызов в цикле приводит к O (n²) вычислительной сложности. Моя наивная реализация пытается обработать 10 000 задач. Перегрузка в моей машине составляет почти 10 секунд. Мне бы хотелось, чтобы этот метод был почти таким же быстродействующим, как и встроенный Task.WhenAll, который может с легкостью обрабатывать сотни тысяч задач. Как можно улучшить метод WhenEach, чтобы он работал прилично?

Ответы [ 3 ]

2 голосов
/ 02 октября 2019

Вы можете использовать канал в качестве асинхронной очереди. Каждое задание может записываться в канал после его завершения. Элементы в канале будут возвращены как IAsyncEnumerable через ChannelReader.ReadAllAsync .

IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)
{
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    var continuations=inputTasks.Select(t=>t.ContinueWith(x=>
                                           writer.TryWrite(x.Result)));
    _ = Task.WhenAll(continuations)
            .ContinueWith(t=>writer.Complete(t.Exception));

    return channel.Reader.ReadAllAsync();
}

Когда все задачи завершены, writer.Complete() вызывается для закрытия канала.

Для проверки этого кода создаются задачи с уменьшающимися задержками. Это должно вернуть индексы в обратном порядке:

var tasks=Enumerable.Range(1,4)
                    .Select(async i=>
                    { 
                      await Task.Delay(300*(5-i));
                      return i;
                    });

await foreach(var i in Interleave(tasks))
{
     Console.WriteLine(i);

}

Производит:

4
3
2
1
2 голосов
/ 02 октября 2019

Просто для удовольствия, используя System.Reactive и System.Interactive.Async:

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)
    => Observable.Merge(tasks.Select(t => t.ToObservable())).ToAsyncEnumerable()
1 голос
/ 02 октября 2019

Используя код из этой статьи, вы можете реализовать следующее:

public static Task<Task<T>>[] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
   var inputTasks = tasks.ToList();

   var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
   var results = new Task<Task<T>>[buckets.Length];
   for (int i = 0; i < buckets.Length; i++)
   {
       buckets[i] = new TaskCompletionSource<Task<T>>();
       results[i] = buckets[i].Task;
   }

   int nextTaskIndex = -1;
   Action<Task<T>> continuation = completed =>
   {
       var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
       bucket.TrySetResult(completed);
   };

   foreach (var inputTask in inputTasks)
       inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

   return results;
}

Затем измените свой WhenEach на вызов Interleaved кода

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks)
{
    foreach (var bucket in Interleaved(tasks))
    {
        var t = await bucket;
        yield return await t;
    }
}

Затем вы можете позвонить своему WhenEach как обычно

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

Я выполнил некоторый элементарный бенчмаркинг с 10 тыс. Заданий и выполнил в 5 раз лучшую скорость.

...