Я думал об этом время от времени в течение последних нескольких дней, и я не могу найти встроенный PLINQ способ сделать это в C # 4.0. Принятый ответ на этот вопрос об использовании FirstOrDefault не возвращает значение, пока не завершится полный запрос PLINQ, и все еще возвращает (упорядоченный) первый результат. В следующем крайнем примере показано поведение:
var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());
var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
.WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
.Where(i => i % 2 == 0 )
.Select( i =>
{
if( i == 0 )
Thread.Sleep(3000);
else
Thread.Sleep(rnd.Value.Next(50, 100));
return string.Format("dat {0}", i).Dump();
});
cts.CancelAfter(5000);
// waits until all results are in, then returns first
q.FirstOrDefault().Dump("result");
Я не вижу встроенного способа немедленно получить первый доступный результат, но мне удалось найти два обходных пути.
Первый создает Задачи для выполнения работы и возвращает Задачу, что приводит к быстрому выполнению запроса PLINQ. Результирующие задачи могут быть переданы в WaitAny для получения первого результата, как только он станет доступен:
var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());
var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
.WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
.Where(i => i % 2 == 0 )
.Select( i =>
{
return Task.Factory.StartNew(() =>
{
if( i == 0 )
Thread.Sleep(3000);
else
Thread.Sleep(rnd.Value.Next(50, 100));
return string.Format("dat {0}", i).Dump();
});
});
cts.CancelAfter(5000);
// returns as soon as the tasks are created
var ts = q.ToArray();
// wait till the first task finishes
var idx = Task.WaitAny( ts );
ts[idx].Result.Dump("res");
Это, наверное, ужасный способ сделать это. Поскольку фактическая работа запроса PLINQ - это очень быстрая задача Task.Factory.StartNew, бессмысленно использовать PLINQ вообще. Простой .Select( i => Task.Factory.StartNew( ...
в IEnumerable чище и, вероятно, быстрее.
Второй обходной путь использует очередь (BlockingCollection) и просто вставляет результаты в эту очередь после их вычисления:
var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());
var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
.WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
.Where(i => i % 2 == 0 )
.Select( i =>
{
if( i == 0 )
Thread.Sleep(3000);
else
Thread.Sleep(rnd.Value.Next(50, 100));
return string.Format("dat {0}", i).Dump();
});
cts.CancelAfter(5000);
var qu = new BlockingCollection<string>();
// ForAll blocks until PLINQ query is complete
Task.Factory.StartNew(() => q.ForAll( x => qu.Add(x) ));
// get first result asap
qu.Take().Dump("result");
С помощью этого метода работа выполняется с использованием PLINQ, а функция Take () BlockingCollecion вернет первый результат, как только он будет вставлен в запрос PLINQ.
Хотя это приводит к желаемому результату, я не уверен, что он имеет какое-либо преимущество перед простым использованием простых задач + WaitAny