Я использую шаблон производителя / потребителя с библиотекой FileHelpers для импорта данных из одного файла (который может быть огромным), используя несколько потоков. Предполагается, что каждый поток импортирует часть этого файла, и я хотел бы использовать свойство LineNumber экземпляра FileHelperAsyncEngine, который считывает файл в качестве первичного ключа для импортированных строк.
FileHelperAsyncEngine внутренне имеет IEnumerator IEnumerable.GetEnumerator ();
который повторяется с использованием метода engine.ReadNext (). Это внутренне устанавливает свойство LineNumber (которое не является потокобезопасным).
Потребители будут иметь связанных с ними производителей, которые будут предоставлять DataTables потребителям, которые будут использовать их через класс SqlBulkLoad, который будет использовать реализацию IDataReader, которая будет перебирать коллекцию DataTables, которые являются внутренними для экземпляра Consumer. Каждый экземпляр будет иметь один экземпляр SqlBulkCopy, связанный с ним.
У меня проблема с блокировкой нитей. Ниже описано, как создать несколько потоков продюсера. Я начинаю каждую тему после слов. Будет вызван метод Produce для экземпляра производителя, определяющий, какой фрагмент входного файла будет обработан.
Кажется, что engine.LineNumber не является потокобезопасным, и я не импортирую правильный LineNumber в базу данных. Кажется, что к тому времени engine.LineNumber читается другим потоком с именем engine.ReadNext () и изменяет свойство engine.LineNumber. Я не хочу блокировать цикл, который должен обрабатывать часть входного файла, потому что я теряю параллелизм. Как реорганизовать код для решения этой проблемы с потоками?
Спасибо
Рад
for (int i = 0; i < numberOfProducerThreads; i++)
DataConsumer consumer = dataConsumers[i];
//create a new producer
DataProducer producer = new DataProducer();
//consumer has already being created
consumer.Subscribe(producer);
FileHelperAsyncEngine orderDetailEngine = new FileHelperAsyncEngine(recordType);
orderDetailEngine.Options.RecordCondition.Condition = RecordCondition.ExcludeIfBegins;
orderDetailEngine.Options.RecordCondition.Selector = STR_ORDR;
int skipLines = i * numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount;
Thread newThread = new Thread(() =>
{
producer.Produce(consumer, inputFilePath, lineNumberFieldName, dict, orderDetailEngine, skipLines, numberOfBufferTablesToProcess);
consumer.SetEndOfData(producer);
});
producerThreads.Add(newThread); thread.Start();}
public void Produce(DataConsumer consumer, string inputFilePath, string lineNumberFieldName, Dictionary<string, object> dict, FileHelperAsyncEngine engine, int skipLines, int numberOfBufferTablesToProcess)
{
lock (this)
{
engine.Options.IgnoreFirstLines = skipLines;
engine.BeginReadFile(inputFilePath);
}
int rowCount = 1;
DataTable buffer = consumer.BufferDataTable;
while (engine.ReadNext() != null)
{
lock (this)
{
dict[lineNumberFieldName] = engine.LineNumber;
buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));
if (rowCount % DataBuffer.MaxBufferRowCount == 0)
{
consumer.AddBufferDataTable(buffer);
buffer = consumer.BufferDataTable;
}
if (rowCount % (numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount) == 0)
{
break;
}
rowCount++;
}
}
if (buffer.Rows.Count > 0)
{
consumer.AddBufferDataTable(buffer);
}
engine.Close();
}