РЕДАКТИРОВАТЬ : Я считаю, что нашел решение.Согласно документации на IEnumerable.MoveNext
(выделено):
Если MoveNext проходит конец коллекции, перечислитель располагается после последнего элемента в коллекции и MoveNextвозвращает ложьКогда перечислитель находится в этой позиции, последующие вызовы MoveNext также возвращают false до тех пор, пока Reset не будет вызван .
Согласно следующей логике:
private bool ReachedLimit
{
get
{
if (this.limit == null)
{
return false;
}
for (int i = 0; i < this.buffer.Length; i++)
{
if (this.buffer[i] != this.limit[i])
{
return false;
}
}
return true;
}
}
Вызов MoveNext()
вернет false только один раз - когда буфер точно равен пределу.После того, как вы превысили лимит, возвращаемое значение из ReachedLimit
снова станет ложным, и return !this.ReachedLimit
вернет true, так что перечислитель будет продолжаться после конца лимита до тех пор, пока у него не закончатся символы для перечисления.,По-видимому, в реализации ParallelQuery.Count()
, MoveNext()
вызывается несколько раз, когда он достигает конца, и, поскольку он снова начинает возвращать истинное значение, перечислитель с радостью продолжает возвращать больше элементов (в вашем случае это не так).пользовательский код, который обходит перечислитель вручную и, по-видимому, также не относится к вызову ForAll
, поэтому они «случайно» возвращают правильные результаты).
Самое простое решение - запомнить возвращаемое значениес MoveNext()
как только оно становится ложным:
private bool _canMoveNext = true;
public bool MoveNext()
{
if (!_canMoveNext) return false;
...
if (this.position == this.buffer.Length)
{
if (this.ReachedLimit) _canMoveNext = false;
...
}
Теперь, когда оно начинает возвращать ложное значение, оно будет возвращать ложное значение для каждого будущего вызова, и это возвращает правильный результат из AsParallel().Count()
.Надеюсь, что это поможет!
Документация по Замечания по Partitioner (выделено мной):
Статические методы на Partitioner являются поточно-ориентированными и могутиспользоваться одновременно из нескольких потоков.Тем не менее, пока используется созданный разделитель, базовый источник данных не следует изменять , будь то из того же потока, который использует разделитель, или из отдельного потока.
Из того, что я могу понять из кода, который вы дали, может показаться, что ParallelQuery.Count()
, скорее всего, будет иметь проблемы с безопасностью потоков, поскольку возможно, что он может повторять несколько перечислителей одновременно, тогда как для всех других решений требуются перечислителибыть запущен синхронизирован.Не видя код, который вы используете для MyGenerator
и MyPartitioner
, сложно определить, могут ли проблемы безопасности потоков быть причиной.
Для демонстрации я написал простой перечислитель, который возвращаетпервые сто чисел в виде строк.Кроме того, у меня есть разделитель, который распределяет элементы в базовом перечислителе по совокупности numPartitions
отдельных списков.Используя все методы, которые вы описали выше, на нашем 12-ядерном сервере (когда я вывожу numPartitions
, он использует 12
по умолчанию на этой машине), я получаю ожидаемый результат 100 (это код LINQPad-ready):
void Main()
{
var partitioner = new SimplePartitioner(GetEnumerator());
GetEnumerator().Count().Dump();
partitioner.GetPartitions(10).Select(enumerator =>
{
int count = 0;
while (enumerator.MoveNext())
{
count++;
}
return count;
}).Sum().Dump();
var theCount = 0;
partitioner.AsParallel().ForAll(e => Interlocked.Increment(ref theCount));
theCount.Dump();
partitioner.AsParallel().Count().Dump();
}
// Define other methods and classes here
public IEnumerable<string> GetEnumerator()
{
for (var i = 1; i <= 100; i++)
yield return i.ToString();
}
public class SimplePartitioner : Partitioner<string>
{
private IEnumerable<string> input;
public SimplePartitioner(IEnumerable<string> input)
{
this.input = input;
}
public override IList<IEnumerator<string>> GetPartitions(int numPartitions)
{
var list = new List<string>[numPartitions];
for (var i = 0; i < numPartitions; i++)
list[i] = new List<string>();
var index = 0;
foreach (var s in input)
list[(index = (index + 1) % numPartitions)].Add(s);
IList<IEnumerator<string>> result = new List<IEnumerator<string>>();
foreach (var l in list)
result.Add(l.GetEnumerator());
return result;
}
}
Вывод:
100
100
100
100
Это явно работает.Без дополнительной информации невозможно сказать, что не работает в вашей конкретной реализации.