Я начинаю писать свои первые параллельные приложения. Этот разделитель будет перечислять более чем IDataReader
извлекающих chunkSize
записей одновременно из источника данных.
TLDR; версия
private object _Lock = new object();
public IEnumerator GetEnumerator()
{
var infoSource = myInforSource.GetEnumerator();
//Will this cause a deadlock if two threads
lock (_Lock) //use the enumator at the same time?
{
while (infoSource.MoveNext())
{
yield return infoSource.Current;
}
}
}
полный код
protected class DataSourcePartitioner<object[]> : System.Collections.Concurrent.Partitioner<object[]>
{
private readonly System.Data.IDataReader _Input;
private readonly int _ChunkSize;
public DataSourcePartitioner(System.Data.IDataReader input, int chunkSize = 10000)
: base()
{
if (chunkSize < 1)
throw new ArgumentOutOfRangeException("chunkSize");
_Input = input;
_ChunkSize = chunkSize;
}
public override bool SupportsDynamicPartitions { get { return true; } }
public override IList<IEnumerator<object[]>> GetPartitions(int partitionCount)
{
var dynamicPartitions = GetDynamicPartitions();
var partitions =
new IEnumerator<object[]>[partitionCount];
for (int i = 0; i < partitionCount; i++)
{
partitions[i] = dynamicPartitions.GetEnumerator();
}
return partitions;
}
public override IEnumerable<object[]> GetDynamicPartitions()
{
return new ListDynamicPartitions(_Input, _ChunkSize);
}
private class ListDynamicPartitions : IEnumerable<object[]>
{
private System.Data.IDataReader _Input;
int _ChunkSize;
private object _ChunkLock = new object();
public ListDynamicPartitions(System.Data.IDataReader input, int chunkSize)
{
_Input = input;
_ChunkSize = chunkSize;
}
public IEnumerator<object[]> GetEnumerator()
{
while (true)
{
List<object[]> chunk = new List<object[]>(_ChunkSize);
lock(_Input)
{
for (int i = 0; i < _ChunkSize; ++i)
{
if (!_Input.Read())
break;
var values = new object[_Input.FieldCount];
_Input.GetValues(values);
chunk.Add(values);
}
if (chunk.Count == 0)
yield break;
}
var chunkEnumerator = chunk.GetEnumerator();
lock(_ChunkLock) //Will this cause a deadlock?
{
while (chunkEnumerator.MoveNext())
{
yield return chunkEnumerator.Current;
}
}
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable<object[]>)this).GetEnumerator();
}
}
}
Я хотел, чтобы IEnumerable
объект, который он передал обратно, был поточно-безопасным (пример MSDN был таким, так что я предполагаю, что PLINQ и TPL могут понадобиться) обеспечит ли блокировка на _ChunkLock
возле нижней справки безопасность потока или это приведет к тупику? Из документации я не мог сказать, будет ли снята блокировка на yeld return
.
Также, если в .net есть встроенная функциональность, которая будет делать то, что я пытаюсь сделать, я бы скорее использовал это. И если вы обнаружите какие-либо другие проблемы с кодом, я был бы признателен.