Несколько потоков заполняют свой результат в одной DataTable C # - PullRequest
3 голосов
/ 13 июня 2009

Я только начинаю изучать концепцию потоков, и я как бы застрял в этой одной проблеме, она сводит меня с ума ....

Что мне действительно нужно сделать -

У меня есть около 300 текстовых файлов в локальном каталоге, которые нужно проанализировать на предмет определенных значений ... После того, как я найду эти "значения" в каждом текстовом файле, мне нужно сохранить их в базе данных .. Поэтому я следовал простому подходу к доступу к каждому текстовому файлу в каталоге - анализу и обновлению результирующих значений в виде строки в локальном DataTable, и когда я закончу анализ всех файлов и сохраню 300 строк в DataTable, я сделаю SQLBulkCopy DataTable для моей базы данных. Этот подход работает отлично, за исключением того, что мне требуется около 10 минут для запуска моего кода!

Что я пытаюсь сделать сейчас -

Создайте новый поток для каждого файла и сохраняйте количество потоков ниже 4 в любой момент времени ... тогда каждый поток будет анализировать файл и возвращать строку для обновления локальной таблицы данных

Где я застрял - я не понимаю, как обновить этот единственный Datatable, который получает строки из нескольких потоков ...

Довольно объяснение, не правда ли ... надеюсь, кто-то здесь может предложить хорошую идею для этого ...

Спасибо, Nidhi

Ответы [ 5 ]

6 голосов
/ 18 июня 2009

Как уже указывалось, вам нужно точно определить, где находится ваше узкое место и почему вы используете многопоточность.

При переходе к нескольким потокам у вас есть потенциал для повышения производительности. Однако, если вы обновляете один и тот же DataTable для каждого потока, вы ограничены DataTable. Только один поток может записывать в DataTable одновременно (который вы контролируете с помощью блокировки), так что вы по-прежнему принципиально обрабатываете последовательно.

С другой стороны, большинство баз данных предназначены для нескольких подключений, работающих в нескольких потоках, и для этой цели были сильно настроены. Если вы все еще хотите использовать несколько потоков: пусть каждый поток имеет свое собственное соединение с базой данных и выполняет свою собственную обработку.

Теперь, в зависимости от типа выполняемой обработки, ваше узкое место может заключаться в открытии и обработке файла, а не в обновлении базы данных.

Один из способов разделить вещи:

  1. Поместить все имена файлов для обработки в очередь имен файлов.
  2. Создайте поток (или потоки), чтобы извлечь элемент из очереди имени файла, открыть, проанализировать и обработать файл и поместить результаты в очередь результатов.
  3. Попросите другой поток взять результаты из очереди результатов и вставить их в базу данных.

Они могут работать одновременно ... база данных не будет обновляться до тех пор, пока что-то будет обновляться, а просто будет ждать в это время.

Этот подход позволяет вам действительно знать, кто кого ждет. Если часть файла чтения / обработки медленная, создайте больше потоков для этого. Если вставка в часть базы данных происходит медленно, создайте больше потоков для этого. Очереди просто нужно синхронизировать.

Итак, псевдокод:

Queue<string> _filesToProcess = new Queue<string>();
Queue<string> _results = new Queue<string>();
Thread _fileProcessingThread = new Thread( ProcessFiles );
Thread _databaseUpdatingThread = new Thread( UpdateDatabase );
bool _finished = false;

static void Main()
{
    foreach( string fileName in GetFileNamesToProcess() )
    {
       _filesToProcess.Enqueue( fileName );
    }

    _fileProcessingThread.Start();
    _databaseUpdatingThread.Start();

    // if we want to wait until they're both finished
    _fileProcessingThread.Join();
    _databaseUpdatingThread.Join();

    Console.WriteLine( "Done" );
}

void ProcessFiles()
{
   bool filesLeft = true;

   lock( _filesToProcess ){ filesLeft = _filesToProcess.Count() > 0; }

   while( filesLeft )
   {
      string fileToProcess;
      lock( _filesToProcess ){ fileToProcess = _filesToProcess.Dequeue(); }

      string resultAsString = ProcessFileAndGetResult( fileToProcess );

      lock( _results ){ _results.Enqueue( resultAsString ); }

      Thread.Sleep(1); // prevent the CPU from being 100%

      lock( _filesToProcess ){ filesLeft = _filesToProcess.Count() > 0; }
   }

   _finished = true;
}

void UpdateDatabase()
{
   bool pendingResults = false;

   lock( _results ){ pendingResults = _results.Count() > 0; }

   while( !_finished || pendingResults )
   {
      if( pendingResults )
      {
         string resultsAsString;
         lock( _results ){ resultsAsString = _results.Dequeue(); }

         InsertIntoDatabase( resultsAsString ); // implement this however
      }

      Thread.Sleep( 1 ); // prevents the CPU usage from being 100%

      lock( _results ){ pendingResults = _results.Count() > 0; }
   }
}

Я почти уверен, что есть способы сделать это "лучше", но это должно помочь, чтобы вы могли читать и обрабатывать данные, а также добавлять завершенные данные в базу данных и использовать преимущества многопоточности.

Если вы хотите, чтобы другой поток обрабатывал файлы или обновлял базу данных, просто создайте новый поток (MethodName) и вызовите Start ().

Это не самый простой пример, но я думаю, что он тщательный. Вы синхронизируете две очереди, и вам необходимо убедиться, что каждая из них заблокирована перед доступом. Вы отслеживаете, когда должен завершиться каждый поток, и данные распределяются между потоками, но никогда не обрабатываются более одного раза с использованием очередей.

Надеюсь, это поможет.

4 голосов
/ 13 июня 2009

Это будет намного проще, если вы просто позволите каждому из четырех потоков писать в базу данных самостоятельно. В этом сценарии вам не нужно беспокоиться о потоках (за исключением файлов, над которыми работает каждый поток), поскольку каждый рабочий поток может поддерживать свои собственные данные и использовать 25% файлов.

В качестве альтернативы, вы можете иметь одну таблицу данных, которую используют все потоки - просто убедитесь, что доступ к ней обернут блокировкой, например:

lock(YourTable.Rows.SyncRoot){
  // add rows to table
}

Конечно, это все спорно, если узким местом является диск, как отмечает @David B.

1 голос
/ 13 июня 2009

Что заставило вас думать, что большее количество потоков улучшит ситуацию? Они, вероятно, не будут.

Я предлагаю вам сначала заставить программу работать, а потом беспокоиться о том, чтобы она работала быстрее. Делайте это только с одной нитью.

0 голосов
/ 13 июня 2009

Как уже отмечали другие, не забудьте заблокировать свою таблицу перед обновлением. C #:

private object tableLock;

/*
Later in code.
*/

private void UpdateDataTable(object data)
{
    lock(tableLock)
    {
          //Add or update table rows
    }
}

Что касается методов фактического контроля и поддержания потоков в строке, просто используйте объект ThreadPool, установите максимальное количество потоков в своем пределе, и организация очереди может позаботиться обо всем. Для дополнительного контроля вы можете добавить некоторую логику, которая использует массив объектов WaitHandle. На самом деле это может быть хорошей идеей, если учесть, что вы хотите поставить в очередь 300 отдельных объектов.

0 голосов
/ 13 июня 2009

SQLBulkCopy - большой молот для всего 300 строк.

Извлечение Умный пул потоков . Это пул потоков экземпляра, который можно очень легко ограничить до 4 потоков. Поскольку у вас есть только 300 строк, рассмотрите возможность отправки их непосредственно в SQL в каждом потоке, а не для агрегирования в вашем коде.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...