Вы можете вернуть IEnumerable
, но для этого вы должны заблокировать ваш текущий поток. Вам необходим TransformBlock
для обработки идентификаторов и задача-фидер, которая асинхронно передает TransformBlock
с идентификаторами. Наконец, текущий поток войдет в цикл блокировки, ожидая, когда произведенный материал даст:
static IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
var processor = new TransformBlock<int, Stuff>(async id =>
{
return await GetStuffById(id);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
BoundedCapacity = 50, // Avoid buffering millions of ids
});
var completionCTS = new CancellationTokenSource();
var feederTask = Task.Run(async () =>
{
foreach (int id in ids)
{
if (completionCTS.IsCancellationRequested) break;
await processor.SendAsync(id).ConfigureAwait(false);
}
processor.Complete();
});
try
{
while (processor.OutputAvailableAsync().Result)
{
while (processor.TryReceive(out var stuff))
{
yield return stuff;
}
}
feederTask.Wait(); // To propagate exceptions
processor.Completion.Wait(); // To propagate exceptions
}
finally // This runs when the caller completes the enumeration
{
completionCTS.Cancel();
}
}
Нет необходимости ConcurrentBag
, поскольку TransformBlock
имеет внутренний выходной буфер. Сложная часть имеет дело со случаем, когда вызывающая сторона откажется от перечисления IEnumerable<Stuff>
, сделав ранний разрыв или будучи заблокирована исключением. В этом случае вы не хотите, чтобы задача-фидер продолжала качать IEnumerable<int>
с идентификаторами до конца. К счастью есть решение . Заключение цикла уступки в блок try / finally позволяет получить уведомление об этом событии, что позволяет своевременно завершить задачу-фидер.
Альтернативная реализация может устранить необходимость в фидере-задача, объединяя прокачку идентификаторов, кормление блока и сдачу материала в одну петлю. В этом случае вы бы хотели отставание между накачкой и урожайностью. Для этого может быть полезен метод расширения MoreLinq Lag
(или Lead
).
Обновление: Вот другая реализация, которая перечисляет и возвращает в одном и том же цикле. Для достижения желаемого отставания перечисляемый источник дополняется справа некоторыми фиктивными элементами, число которых равно степени параллелизма.
Эта реализация принимает универсальные типы вместо int
и Stuff
.
public static IEnumerable<TResult> Transform<TSource, TResult>(
IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
int degreeOfConcurrency)
{
var processor = new TransformBlock<TSource, TResult>(async item =>
{
return await taskFactory(item);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = degreeOfConcurrency
});
var paddedSource = source.Select(item => (item, true))
.Concat(Enumerable.Repeat((default(TSource), false), degreeOfConcurrency));
int index = -1;
bool completed = false;
foreach (var (item, hasValue) in paddedSource)
{
index++;
if (hasValue) { processor.Post(item); }
else if (!completed) { processor.Complete(); completed = true; }
if (index >= degreeOfConcurrency)
{
if (!processor.OutputAvailableAsync().Result) break; // Blocking call
if (!processor.TryReceive(out var result))
throw new InvalidOperationException(); // Should never happen
yield return result;
}
}
processor.Completion.Wait();
}
Пример использования:
IEnumerable<Stuff> lotsOfStuff = Transform(ids, GetStuffById, 5);
Обе реализации можно тривиально изменить, чтобы они возвращали IAsyncEnumerable
вместо IEnumerable
, чтобы избежать блокировки вызывающего потока.