Допустим, у меня есть две последовательности, возвращающие целые числа от 1 до 5.
Первая возвращает 1, 2 и 3 очень быстро, но 4 и 5 занимают 200 мс каждая.
public static IEnumerable<int> FastFirst()
{
for (int i = 1; i < 6; i++)
{
if (i > 3) Thread.Sleep(200);
yield return i;
}
}
секунда возвращает 1, 2 и 3 с задержкой 200 мс, но 4 и 5 возвращаются быстро.
public static IEnumerable<int> SlowFirst()
{
for (int i = 1; i < 6; i++)
{
if (i < 4) Thread.Sleep(200);
yield return i;
}
}
Объединяя обе эти последовательности, я получаю только числа от 1 до 5.
FastFirst().Union(SlowFirst());
Я не могу гарантировать, какой из двух методов имеет задержки в какой момент, поэтому порядок выполнения не может гарантировать решение для меня.Поэтому я хотел бы распараллелить объединение, чтобы минимизировать (искусственную) задержку в моем примере.
Реальный сценарий: у меня есть кэш, который возвращает некоторые сущности,и источник данных, который возвращает все сущностей.Я хотел бы иметь возможность возвращать итератор из метода, который внутренне распараллеливает запрос к кешу и источнику данных, чтобы кэшированные результаты давали как можно быстрее.
Примечание 1: Я понимаю, что этовсе еще тратить циклы процессора;Я не спрашиваю, как я могу предотвратить итерации последовательностей по своим медленным элементам, просто как я могу объединить их как можно быстрее.
Обновление 1: Я настроил achitaka-Отличный ответ Сан - принять нескольких производителей и использовать ContinueWhenAll, чтобы установить CompleteAdding в BlockingCollection только один раз.Я просто поместил это здесь, так как это потерялось бы в отсутствии форматирования комментариев.Любые дальнейшие отзывы будут отличными!
public static IEnumerable<TResult> SelectAsync<TResult>(
params IEnumerable<TResult>[] producer)
{
var resultsQueue = new BlockingCollection<TResult>();
var taskList = new HashSet<Task>();
foreach (var result in producer)
{
taskList.Add(
Task.Factory.StartNew(
() =>
{
foreach (var product in result)
{
resultsQueue.Add(product);
}
}));
}
Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());
return resultsQueue.GetConsumingEnumerable();
}