У меня есть асинхронный поток задач, который генерируется путем применения асинхронной c лямбды к потоку элементов:
IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
await Task.Delay(100);
return x.ToString();
})
Методы AsyncEnumerable.Range
и Select
, приведенные выше, предоставляются из пакет System.Linq.Async
.
Нужный результат - это поток результатов, выраженный как IAsyncEnumerable<string>
. Результаты должны быть переданы в том же порядке, что и исходные задачи. Кроме того, необходимо ограничить перечисление потока, чтобы в любой момент времени было активным не более указанного числа задач.
Я хотел бы получить решение в виде метода расширения для IAsyncEnumerable<Task<T>>
типа, чтобы я мог связать его несколько раз и сформировать конвейер обработки, аналогичный по функциональности с конвейером TPL Dataflow , но выраженный бегло. Ниже приведена подпись желаемого метода расширения:
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
int concurrencyLevel);
Принятие также CancellationToken
в качестве аргумента было бы хорошей функцией.
Обновление: Для полноты изложения приведу пример плавного конвейера обработки, образованного двойным сцеплением по методу AwaitResults
. Этот конвейер начинается с блока PLINQ, просто чтобы продемонстрировать, что возможно смешивание PLINQ и Linq.Asyn c.
int[] results = await Enumerable.Range(1, 20)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(2)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Thread.Sleep(100); // Simulate some CPU-bound operation
return x;
})
.ToAsyncEnumerable()
.Select(async x =>
{
await Task.Delay(300); // Simulate some I/O operation
return x;
})
.AwaitResults(concurrencyLevel: 5)
.Select(x => Task.Run(() =>
{
Thread.Sleep(100); // Simulate another CPU-bound operation
return x;
}))
.AwaitResults(concurrencyLevel: 2)
.ToArrayAsync();
Console.WriteLine($"Results: {String.Join(", ", results)}");
Ожидаемый результат:
Результаты: 1, 2 , 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20