Как ждать результатов IAsyncEnumerable с указанным c уровнем параллелизма - PullRequest
0 голосов
/ 24 февраля 2020

У меня есть асинхронный поток задач, который генерируется путем применения асинхронной c лямбды к потоку элементов:

IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
    await Task.Delay(100);
    return x.ToString();
})

Методы AsyncEnumerable.Range и Select, приведенные выше, предоставляются из пакет System.Linq.Async.

Нужный результат - это поток результатов, выраженный как IAsyncEnumerable<string>. Результаты должны быть переданы в том же порядке, что и исходные задачи. Кроме того, необходимо ограничить перечисление потока, чтобы в любой момент времени было активным не более указанного числа задач.

Я хотел бы получить решение в виде метода расширения для IAsyncEnumerable<Task<T>> типа, чтобы я мог связать его несколько раз и сформировать конвейер обработки, аналогичный по функциональности с конвейером TPL Dataflow , но выраженный бегло. Ниже приведена подпись желаемого метода расширения:

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel);

Принятие также CancellationToken в качестве аргумента было бы хорошей функцией.


Обновление: Для полноты изложения приведу пример плавного конвейера обработки, образованного двойным сцеплением по методу AwaitResults. Этот конвейер начинается с блока PLINQ, просто чтобы продемонстрировать, что возможно смешивание PLINQ и Linq.Asyn c.

int[] results = await Enumerable.Range(1, 20)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(2)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Thread.Sleep(100); // Simulate some CPU-bound operation
        return x;
    })
    .ToAsyncEnumerable()
    .Select(async x =>
    {
        await Task.Delay(300); // Simulate some I/O operation
        return x;
    })
    .AwaitResults(concurrencyLevel: 5)
    .Select(x => Task.Run(() =>
    {
        Thread.Sleep(100); // Simulate another CPU-bound operation
        return x;
    }))
    .AwaitResults(concurrencyLevel: 2)
    .ToArrayAsync();
Console.WriteLine($"Results: {String.Join(", ", results)}");

Ожидаемый результат:

Результаты: 1, 2 , 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20

1 Ответ

0 голосов
/ 24 февраля 2020

Вот моя реализация метода AwaitResults. Он основан на SemaphoreSlim для управления уровнем параллелизма и Channel<Task<TResult>>, который используется в качестве асинхронной очереди c. Перечисление источника IAsyncEnumerable<Task<TResult>> происходит внутри задачи «забей и забудь» (фидер), которая выталкивает горячие задачи в канал. Он также прикрепляет продолжение к каждой задаче, где освобождается семафор.

Последняя часть метода - это результат l oop, где задачи снимаются с канала по очереди, а затем ожидаются последовательно. Таким образом, результаты выдаются в том же порядке, что и задачи в исходном потоке.

Эта реализация требует, чтобы каждая задача ожидалась дважды, что означает, что ее нельзя использовать для источника типа IAsyncEnumerable<ValueTask<TResult>>, поскольку ValueTask можно ожидать только один раз .

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel = 1,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (concurrencyLevel < 1)
        throw new ArgumentOutOfRangeException(nameof(concurrencyLevel));

    var semaphore = new SemaphoreSlim(concurrencyLevel - 1);
    var channelCapacity = Math.Max(1000, concurrencyLevel * 10);
    var tasksChannel = Channel.CreateBounded<Task<TResult>>(channelCapacity);
    var completionCts = CancellationTokenSource.CreateLinkedTokenSource(
        cancellationToken);

    // Feeder task: fire and forget
    _ = Task.Run(async () =>
    {
        try
        {
            await foreach (var task in source
                .WithCancellation(completionCts.Token).ConfigureAwait(false))
            {
                HandleTaskCompletion(task);
                await tasksChannel.Writer.WriteAsync(task, completionCts.Token)
                    .ConfigureAwait(false);
                await semaphore.WaitAsync(completionCts.Token)
                    .ConfigureAwait(false); // Acquire before MoveNextAsync
            }
            tasksChannel.Writer.Complete();
        }
        catch (Exception ex)
        {
            tasksChannel.Writer.Complete(ex);
        }
    });

    async void HandleTaskCompletion(Task task)
    {
        try
        {
            await task.ConfigureAwait(false);
        }
        catch
        {
            // Ignore exceptions here
        }
        finally
        {
            semaphore.Release();
        }
    }

    try
    {
        while (await tasksChannel.Reader.WaitToReadAsync(cancellationToken)
            .ConfigureAwait(false))
        {
            while (tasksChannel.Reader.TryRead(out var task))
            {
                yield return await task.ConfigureAwait(false);
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
    }
    finally // Happens when the caller disposes the output enumerator
    {
        completionCts.Cancel();
    }
}

Важной деталью является блок try-finally вокруг финального результата l oop. Это необходимо для случая, когда вызывающая сторона метода преждевременно прекращает перечисление результирующего потока. В этом случае перечисление исходного потока также должно быть прекращено, и это завершение распространяется в обратном направлении, используя CancellationTokenSource. Без этого задание подачи никогда не будет выполнено, объекты никогда не будут собирать мусор, а память будет вытекать.

Примечание: Отмена cancellationToken может не отменить всю операцию мгновенно , Для максимальной отзывчивости следует использовать тот же cancellationToken для отмены отдельных задач.

...