На что рассчитывает счет ParallelQuerys? - PullRequest
2 голосов
/ 30 ноября 2011

Я тестирую самописанный генератор элементов (ICollection<string>) и сравниваю рассчитанное количество с фактическим количеством, чтобы понять, есть ли ошибка в моем алгоритме или нет.

Поскольку этот генератор может генерировать множество элементов по требованию, я смотрю в Partitioner<string>, и я реализовал базовый, который, кажется, также производит допустимые перечислители, которые вместе дают то же количество string с, что и вычисленное. 1008 *

Теперь я хочу проверить, как это происходит при параллельном запуске (снова сначала проверить правильность подсчета) :

MyGenerator generator = new MyGenerator();
MyPartitioner partitioner = new MyPartitioner(generator);

int isCount = partitioner.AsParallel().Count();
int shouldCount = generator.Count;

bool same = isCount == shouldCount; // false

Не понимаю, почему этот счет не равен! Что делает ParallelQuery<string>?

generator.Count() == generator.Count // true

partitioner.GetPartitions(xyz).Select(enumerator =>
    {
        int count = 0;
        while (enumerator.MoveNext())
        {
            count++;
        }
        return count;
    }).Sum() == generator.Count // true

Итак, в настоящее время я не вижу ошибки в моем коде. Затем я попытался подсчитать, что ParallelQuery<string>:

int count = 0;
partitioner.AsParallel().ForAll(e => Interlocked.Increment(ref count));
count == generator.Count // true

Суммировано: Каждый считает мой перечислимый правильный, ParallelQuery.ForAll перечисляет ровно generator.Count элементов. Но что значит ParallelQuery.Count()?

Если правильный счет равен примерно 10 КБ, ParallelQuery видит 40 КБ.


    internal sealed class PartialWordEnumerator : IEnumerator<string>
    {
        private object sync = new object();

        private readonly IEnumerable<char> characters;

        private readonly char[] limit;

        private char[] buffer;
        private IEnumerator<char>[] enumerators;

        private int position = 0;

        internal PartialWordEnumerator(IEnumerable<char> characters, char[] state, char[] limit)
        {
            this.characters = new List<char>(characters);

            this.buffer = (char[])state.Clone();

            if (limit != null)
            {
                this.limit = (char[])limit.Clone();
            }

            this.enumerators = new IEnumerator<char>[this.buffer.Length];

            for (int i = 0; i < this.buffer.Length; i++)
            {
                this.enumerators[i] = SkipTo(state[i]);
            }
        }

        private IEnumerator<char> SkipTo(char c)
        {
            IEnumerator<char> first = this.characters.GetEnumerator();
            IEnumerator<char> second = this.characters.GetEnumerator();

            while (second.MoveNext())
            {
                if (second.Current == c)
                {
                    return first;
                }

                first.MoveNext();
            }

            throw new InvalidOperationException();
        }

        private bool ReachedLimit
        {
            get
            {
                if (this.limit == null)
                {
                    return false;
                }

                for (int i = 0; i < this.buffer.Length; i++)
                {
                    if (this.buffer[i] != this.limit[i])
                    {
                        return false;
                    }
                }

                return true;
            }
        }

        public string Current
        {
            get
            {
                if (this.buffer == null)
                {
                    throw new ObjectDisposedException(typeof(PartialWordEnumerator).FullName);
                }

                return new string(this.buffer);
            }
        }

        object IEnumerator.Current
        {
            get { return this.Current; }
        }

        public bool MoveNext()
        {
            lock (this.sync)
            {
                if (this.position == this.buffer.Length)
                {
                    this.position--;
                }

                if (this.position == -1)
                {
                    return false;
                }

                IEnumerator<char> enumerator = this.enumerators[this.position];

                if (enumerator.MoveNext())
                {
                    this.buffer[this.position] = enumerator.Current;
                    this.position++;

                    if (this.position == this.buffer.Length)
                    {
                        return !this.ReachedLimit;
                    }
                    else
                    {
                        return this.MoveNext();
                    }
                }
                else
                {
                    this.enumerators[this.position] = this.characters.GetEnumerator();
                    this.position--;

                    return this.MoveNext();
                }
            }
        }

        public void Dispose()
        {
            this.position = -1;
            this.buffer = null;
        }

        public void Reset()
        {
            throw new NotSupportedException();
        }
    }

    public override IList<IEnumerator<string>> GetPartitions(int partitionCount)
    {
        IEnumerator<string>[] enumerators = new IEnumerator<string>[partitionCount];

        List<char> characters = new List<char>(this.generator.Characters);

        int length = this.generator.Length;

        int characterCount = this.generator.Characters.Count;

        int steps = Math.Min(characterCount, partitionCount);

        int skip = characterCount / steps;

        for (int i = 0; i < steps; i++)
        {
            char c = characters[i * skip];

            char[] state = new string(c, length).ToCharArray();
            char[] limit = null;

            if ((i + 1) * skip < characterCount)
            {
                c = characters[(i + 1) * skip];
                limit = new string(c, length).ToCharArray();
            }

            if (i == steps - 1)
            {
                limit = null;
            }

            enumerators[i] = new PartialWordEnumerator(characters, state, limit);
        }

        for (int i = steps; i < partitionCount; i++)
        {
            enumerators[i] = Enumerable.Empty<string>().GetEnumerator();
        }

        return enumerators;
    }

1 Ответ

2 голосов
/ 30 ноября 2011

РЕДАКТИРОВАТЬ : Я считаю, что нашел решение.Согласно документации на IEnumerable.MoveNext (выделено):

Если MoveNext проходит конец коллекции, перечислитель располагается после последнего элемента в коллекции и MoveNextвозвращает ложьКогда перечислитель находится в этой позиции, последующие вызовы MoveNext также возвращают false до тех пор, пока Reset не будет вызван .

Согласно следующей логике:

    private bool ReachedLimit
    {
        get
        {
            if (this.limit == null)
            {
                return false;
            }

            for (int i = 0; i < this.buffer.Length; i++)
            {
                if (this.buffer[i] != this.limit[i])
                {
                    return false;
                }
            }

            return true;
        }
    }

Вызов MoveNext() вернет false только один раз - когда буфер точно равен пределу.После того, как вы превысили лимит, возвращаемое значение из ReachedLimit снова станет ложным, и return !this.ReachedLimit вернет true, так что перечислитель будет продолжаться после конца лимита до тех пор, пока у него не закончатся символы для перечисления.,По-видимому, в реализации ParallelQuery.Count(), MoveNext() вызывается несколько раз, когда он достигает конца, и, поскольку он снова начинает возвращать истинное значение, перечислитель с радостью продолжает возвращать больше элементов (в вашем случае это не так).пользовательский код, который обходит перечислитель вручную и, по-видимому, также не относится к вызову ForAll, поэтому они «случайно» возвращают правильные результаты).

Самое простое решение - запомнить возвращаемое значениес MoveNext() как только оно становится ложным:

private bool _canMoveNext = true;
public bool MoveNext()
{
    if (!_canMoveNext) return false;
    ...

        if (this.position == this.buffer.Length)
        {
            if (this.ReachedLimit) _canMoveNext = false;
    ...
}

Теперь, когда оно начинает возвращать ложное значение, оно будет возвращать ложное значение для каждого будущего вызова, и это возвращает правильный результат из AsParallel().Count().Надеюсь, что это поможет!


Документация по Замечания по Partitioner (выделено мной):

Статические методы на Partitioner являются поточно-ориентированными и могутиспользоваться одновременно из нескольких потоков.Тем не менее, пока используется созданный разделитель, базовый источник данных не следует изменять , будь то из того же потока, который использует разделитель, или из отдельного потока.

Из того, что я могу понять из кода, который вы дали, может показаться, что ParallelQuery.Count(), скорее всего, будет иметь проблемы с безопасностью потоков, поскольку возможно, что он может повторять несколько перечислителей одновременно, тогда как для всех других решений требуются перечислителибыть запущен синхронизирован.Не видя код, который вы используете для MyGenerator и MyPartitioner, сложно определить, могут ли проблемы безопасности потоков быть причиной.


Для демонстрации я написал простой перечислитель, который возвращаетпервые сто чисел в виде строк.Кроме того, у меня есть разделитель, который распределяет элементы в базовом перечислителе по совокупности numPartitions отдельных списков.Используя все методы, которые вы описали выше, на нашем 12-ядерном сервере (когда я вывожу numPartitions, он использует 12 по умолчанию на этой машине), я получаю ожидаемый результат 100 (это код LINQPad-ready):

void Main()
{
    var partitioner = new SimplePartitioner(GetEnumerator());

    GetEnumerator().Count().Dump();

    partitioner.GetPartitions(10).Select(enumerator =>
    {
        int count = 0;
        while (enumerator.MoveNext())
        {
            count++;
        }
        return count;
    }).Sum().Dump();

    var theCount = 0;
    partitioner.AsParallel().ForAll(e => Interlocked.Increment(ref theCount));
    theCount.Dump();

    partitioner.AsParallel().Count().Dump();
}

// Define other methods and classes here
public IEnumerable<string> GetEnumerator()
{
    for (var i = 1; i <= 100; i++)
        yield return i.ToString();
}

public class SimplePartitioner : Partitioner<string>
{
    private IEnumerable<string> input;
    public SimplePartitioner(IEnumerable<string> input)
    {
        this.input = input;
    }

    public override IList<IEnumerator<string>> GetPartitions(int numPartitions)
    {
        var list = new List<string>[numPartitions];
        for (var i = 0; i < numPartitions; i++)
            list[i] = new List<string>();
        var index = 0;
        foreach (var s in input)
            list[(index = (index + 1) % numPartitions)].Add(s);

        IList<IEnumerator<string>> result = new List<IEnumerator<string>>();
        foreach (var l in list)
            result.Add(l.GetEnumerator());
        return result;
    }
}

Вывод:

100
100
100
100

Это явно работает.Без дополнительной информации невозможно сказать, что не работает в вашей конкретной реализации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...