Проблема блокировки потока с FileHelpers между вызовом метода engine.ReadNext () и свойством readign engine.LineNumber - PullRequest
1 голос
/ 12 апреля 2010

Я использую шаблон производителя / потребителя с библиотекой FileHelpers для импорта данных из одного файла (который может быть огромным), используя несколько потоков. Предполагается, что каждый поток импортирует часть этого файла, и я хотел бы использовать свойство LineNumber экземпляра FileHelperAsyncEngine, который считывает файл в качестве первичного ключа для импортированных строк. FileHelperAsyncEngine внутренне имеет IEnumerator IEnumerable.GetEnumerator (); который повторяется с использованием метода engine.ReadNext (). Это внутренне устанавливает свойство LineNumber (которое не является потокобезопасным).

Потребители будут иметь связанных с ними производителей, которые будут предоставлять DataTables потребителям, которые будут использовать их через класс SqlBulkLoad, который будет использовать реализацию IDataReader, которая будет перебирать коллекцию DataTables, которые являются внутренними для экземпляра Consumer. Каждый экземпляр будет иметь один экземпляр SqlBulkCopy, связанный с ним.

У меня проблема с блокировкой нитей. Ниже описано, как создать несколько потоков продюсера. Я начинаю каждую тему после слов. Будет вызван метод Produce для экземпляра производителя, определяющий, какой фрагмент входного файла будет обработан. Кажется, что engine.LineNumber не является потокобезопасным, и я не импортирую правильный LineNumber в базу данных. Кажется, что к тому времени engine.LineNumber читается другим потоком с именем engine.ReadNext () и изменяет свойство engine.LineNumber. Я не хочу блокировать цикл, который должен обрабатывать часть входного файла, потому что я теряю параллелизм. Как реорганизовать код для решения этой проблемы с потоками?

Спасибо Рад

            for (int i = 0; i < numberOfProducerThreads; i++)
            DataConsumer consumer = dataConsumers[i];

            //create a new producer
            DataProducer producer = new DataProducer();

            //consumer has already being created
            consumer.Subscribe(producer);

            FileHelperAsyncEngine orderDetailEngine = new FileHelperAsyncEngine(recordType);
            orderDetailEngine.Options.RecordCondition.Condition = RecordCondition.ExcludeIfBegins;
            orderDetailEngine.Options.RecordCondition.Selector = STR_ORDR;

            int skipLines = i * numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount;

            Thread newThread = new Thread(() =>
            {
                producer.Produce(consumer, inputFilePath, lineNumberFieldName, dict, orderDetailEngine, skipLines, numberOfBufferTablesToProcess);
                consumer.SetEndOfData(producer);
            }); 
            producerThreads.Add(newThread); thread.Start();}

    public void Produce(DataConsumer consumer, string inputFilePath, string lineNumberFieldName, Dictionary<string, object> dict, FileHelperAsyncEngine engine, int skipLines, int numberOfBufferTablesToProcess)
    {
        lock (this)
        {
            engine.Options.IgnoreFirstLines = skipLines;
            engine.BeginReadFile(inputFilePath);
        }

        int rowCount = 1;

        DataTable buffer = consumer.BufferDataTable;
        while (engine.ReadNext() != null)
        {
            lock (this)
            {
                dict[lineNumberFieldName] = engine.LineNumber;
                buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));
                if (rowCount % DataBuffer.MaxBufferRowCount == 0)
                {
                    consumer.AddBufferDataTable(buffer);
                    buffer = consumer.BufferDataTable;
                }
                if (rowCount % (numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount) == 0)
                {
                    break;
                }
                rowCount++;
            }
        }
        if (buffer.Rows.Count > 0)
        {
            consumer.AddBufferDataTable(buffer);
        }
        engine.Close();
    }

Ответы [ 3 ]

2 голосов
/ 12 апреля 2010

Словарь <> не является потокобезопасным. Правильно ли заблокирован словарь в вышеприведенном коде или он используется только в вашей блокировке (это)?

В качестве отступления я бы избежал парадигмы блокировки (this) и использовал бы универсальные объекты для блокировки вашего кода. Вы можете столкнуться с другими проблемами блокировки, не связанными с конкретными ресурсами. Я подробно описываю эту проблему в своем блоге ( Smart Resource Locking в C # .Net для Thread Safe Code ). НТН

1 голос
/ 03 мая 2010

Вы правы, LineNumber не является потокобезопасным: (

Я просто исследовал код и обнаружил, что мы читаем строку из нашего внутреннего считывателя, а затем обновляем LineNumber, чтобы вообще не обеспечивать безопасность потока.

Проблема в том, что если мы добавим некоторый код sincronization внутри, мы сможем сделать вещи действительно более медленными, возможно, нам нужно создать поточно-ориентированную версию внутреннего кода, чтобы избежать этих издержек.

В любом случае, я думаю, что с точки зрения производительности более медленная часть кода - это файловые операции, поэтому вы не ускоряете использование нескольких потоков для чтения. Возможно, вам нужен только один поток, чтобы прочитать файл в рабочую очередь и иметь несколько потоков, которые читают его и работают с каждой записью, в этом случае вы получаете безопасность потока, которая вам нужна

Приветствия

0 голосов
/ 13 апреля 2010

Я думаю, что исправил проблему. Это был словарь <>, который нуждался в блокировке

замок (дикт) { dict [lineNumberFieldName] = engine.LineNumber; buffer.Rows.Add (ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow (engine.LastRecord, dict, buffer)); } Спасибо OmegaMan за хорошую подсказку.

...