Мой GetEnumerator вызывает тупик? - PullRequest
0 голосов
/ 29 мая 2010

Я начинаю писать свои первые параллельные приложения. Этот разделитель будет перечислять более чем IDataReader извлекающих chunkSize записей одновременно из источника данных.

TLDR; версия

private object _Lock = new object();
public IEnumerator GetEnumerator()
{
    var infoSource = myInforSource.GetEnumerator();
                   //Will this cause a deadlock if two threads 
    lock (_Lock)   //use the enumator at the same time?
    {
        while (infoSource.MoveNext())
        {
            yield return infoSource.Current;
        }
    }
}

полный код

protected class DataSourcePartitioner<object[]> : System.Collections.Concurrent.Partitioner<object[]>
{
    private readonly System.Data.IDataReader _Input;
    private readonly int _ChunkSize;
    public DataSourcePartitioner(System.Data.IDataReader input, int chunkSize = 10000)
        : base()
    {
        if (chunkSize < 1)
            throw new ArgumentOutOfRangeException("chunkSize");
        _Input = input;
        _ChunkSize = chunkSize;
    }

    public override bool SupportsDynamicPartitions { get { return true; } }

    public override IList<IEnumerator<object[]>> GetPartitions(int partitionCount)
    {

        var dynamicPartitions = GetDynamicPartitions();
        var partitions =
            new IEnumerator<object[]>[partitionCount];

        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = dynamicPartitions.GetEnumerator();
        }
        return partitions;


    }

    public override IEnumerable<object[]> GetDynamicPartitions()
    {
        return new ListDynamicPartitions(_Input, _ChunkSize);
    }
    private class ListDynamicPartitions : IEnumerable<object[]>
    {
        private System.Data.IDataReader _Input;
        int _ChunkSize;
        private object _ChunkLock = new object();
        public ListDynamicPartitions(System.Data.IDataReader input, int chunkSize)
        {
            _Input = input;
            _ChunkSize = chunkSize;
        }

        public IEnumerator<object[]> GetEnumerator()
        {

            while (true)
            {
                List<object[]> chunk = new List<object[]>(_ChunkSize);
                lock(_Input)
                {
                    for (int i = 0; i < _ChunkSize; ++i)
                    {
                        if (!_Input.Read())
                            break;
                        var values = new object[_Input.FieldCount];
                        _Input.GetValues(values);
                        chunk.Add(values);
                    }
                    if (chunk.Count == 0)
                        yield break;
                }
                var chunkEnumerator = chunk.GetEnumerator();
                lock(_ChunkLock) //Will this cause a deadlock?
                {
                    while (chunkEnumerator.MoveNext())
                    {
                        yield return chunkEnumerator.Current;
                    }
                }
            }
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<object[]>)this).GetEnumerator();
        }
    }
}

Я хотел, чтобы IEnumerable объект, который он передал обратно, был поточно-безопасным (пример MSDN был таким, так что я предполагаю, что PLINQ и TPL могут понадобиться) обеспечит ли блокировка на _ChunkLock возле нижней справки безопасность потока или это приведет к тупику? Из документации я не мог сказать, будет ли снята блокировка на yeld return.

Также, если в .net есть встроенная функциональность, которая будет делать то, что я пытаюсь сделать, я бы скорее использовал это. И если вы обнаружите какие-либо другие проблемы с кодом, я был бы признателен.

Ответы [ 2 ]

1 голос
/ 29 мая 2010

Одним словом: возможно *.

Если вы всегда используете этот код в контексте цикла foreach, то вряд ли вы нажмете тупик (если возможно, что ваш myInfoSource бесконечен или что Ваш цикл foreach содержит некоторый код, который никогда не прекратится), хотя вы можете увидеть замедления.

Более вероятная причина потенциального (фактически гарантированного) тупика была бы такова:

var myObject = new YourObject();
var enumerator = myObject.GetEnumerator();

// if you do this, and then forget about it...
enumerator.MoveNext();

// ...your lock will never be released

* Я основываю этот ответ на вашем начальном блоке кода.

1 голос
/ 29 мая 2010

Я написал тестовый фреймворк, он не блокируется, но второй поток никогда не получит данные.

static void Main()
{
    En en = new En();
    Task.Factory.StartNew(() =>
        {
            foreach (int i in en)
            {
                Thread.Sleep(100);
                Console.WriteLine("A:" + i.ToString());
            }
        });
    Task.Factory.StartNew(() =>
    {
        foreach (int i in en)
        {
            Thread.Sleep(10);
            Console.WriteLine("B:" +i.ToString());
        }
    });
    Console.ReadLine();
}

public class En : IEnumerable
{
    object _lock = new object();
    static int i = 0;
    public IEnumerator GetEnumerator()
    {
        lock (_lock)
        {
            while (true)
            {
                if (i < 10)
                    yield return i++;
                else
                    yield break;
            }
        }
    }
}

Возвращает

A:0
A:1
A:2
A:3
A:4
A:5
A:6
A:7
A:8
A:9

Вот обновленная версия GetEnumerator, которая должна вести себя правильно.

public IEnumerator<object[]> GetEnumerator()
{

    while (true)
    {
        List<object[]> chunk = new List<object[]>(_ChunkSize);
        _ChunkPos = 0;
        lock(_Input)
        {
            for (int i = 0; i < _ChunkSize; ++i)
            {
                if (!_Input.Read())
                    break;
                var values = new object[_Input.FieldCount];
                _Input.GetValues(values);
                chunk.Add(values);
            }
            if (chunk.Count == 0)
                yield break;
        }
        var chunkEnumerator = chunk.GetEnumerator();
        while (true)
        {
            object[] retVal;
            lock (_ChunkLock)
            {
                if (chunkEnumerator.MoveNext())
                {
                    retVal = chunkEnumerator.Current;
                }
                else 
                    break; //break out of chunk while loop.
            }
            yield return retVal;
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...