Как использовать C # 8 IAsyncEnumerable <T>для асинхронного перечисления задач, выполняемых параллельно - PullRequest
0 голосов
/ 09 июня 2019

Если возможно, я хочу создать асинхронный перечислитель для задач, запускаемых параллельно. Таким образом, первым для завершения является первый элемент перечисления, вторым для завершения является второй элемент перечисления и т. Д.

public static async IAsyncEnumerable<T> ParallelEnumerateAsync(this IEnumerable<Task<T>> coldAsyncTasks)
{
    // ... 
}

Могу поспорить, что есть способ использовать ContinueWith и Queue<T>, но я не полностью доверяю себе в его реализации.

Ответы [ 2 ]

1 голос
/ 10 июня 2019

Если я правильно понимаю ваш вопрос, ваша задача - запустить все задачи, разрешить им всем работать параллельно, но убедиться, что возвращаемые значения обрабатываются в том же порядке, в котором были запущены задачи.

Проверкаиз спецификации, с C # 8.0 Асинхронные потоки постановка в очередь задач для параллельного выполнения, но последовательного возврата может выглядеть какthis.

async Task RunAsyncStreams()
{
    await foreach (var n in RunAndPreserveOrderAsync(GenerateTasks(6)))
    {
        Console.WriteLine($"#{n} is returned");
    }
}

IEnumerable<Task<int>> GenerateTasks(int count)
{
    return Enumerable.Range(1, count).Select(async n =>
    {
        await Task.Delay(new Random().Next(100, 1000));
        Console.WriteLine($"#{n} is complete");
        return n;
    });
}

async IAsyncEnumerable<int> RunAndPreserveOrderAsync(IEnumerable<Task<int>> tasks)
{
    var queue = new Queue<Task<int>>(tasks);
    while (queue.Count > 0) yield return await queue.Dequeue();
}

Возможный вывод:

#5 is complete
#1 is complete
#1 is returned
#3 is complete
#6 is complete
#2 is complete
#2 is returned
#3 is returned
#4 is complete
#4 is returned
#5 is returned
#6 is returned

С практической точки зрения, похоже, не существует никакой новой языковой поддержки для этого шаблона, и, кроме того, поскольку асинхронныйпотоки имеют дело с IAsyncEnumerable<T>, это означает, что база Task не будет работать здесь, и все рабочие методы async должны иметь одинаковый тип возврата Task<T>, что несколько ограничивает асинхронный дизайн на основе потоков.

Из-за этого и в зависимости от вашей ситуации (Хотите ли вы отменить длительные задачи? Требуется ли обработка исключений для каждой задачи? Должно ли быть ограничение на число одновременныхарендовать задачи?) может иметь смысл проверить предложения @TheGeneral там.

1 голос
/ 10 июня 2019

Это то, что вы ищете?

public static async IAsyncEnumerable<T> ParallelEnumerateAsync<T>(
    this IEnumerable<Task<T>> tasks)
{
    var remaining = new List<Task<T>>(tasks);

    while (remaining.Count != 0)
    {
        var task = await Task.WhenAny(remaining);
        remaining.Remove(task);
        yield return (await task);
    }
}
...