Параллельная обработка входящих XML-файлов - PullRequest
1 голос
/ 06 февраля 2012

Мне нужно обработать входящие XML-файлы (они будут созданы другим приложением непосредственно в определенной папке), и мне нужно сделать это быстро.

Может быть до 200 000 файлов в день, и мое текущее предположение - использовать .NET 4 и tpl.

Моя текущая концепция обслуживания:

В цикле я хочу проверить папку на наличие новых файлов, если я найду какой-либо из них, я помещу их в очередь, которая будет обработана другим циклом, который будет брать файлы из очереди и создавать для каждого из них новую задачу. (нить). Количество одновременных задач должно быть настраиваемым. Первая часть проста, но создание двух основных циклов с очередью между ними - что-то новое для меня.

И вопрос: Как создать два цикла (один для проверки папки и добавления файлов, а второй - для извлечения файлов из очереди и их параллельной обработки) и добавления очереди для связи между ними.

Для первой части (проверка папки) предлагаемое решение - использовать FileSystemWatcher. Теперь нужно обсудить вторую часть (возможно, какой-нибудь планировщик заданий).

Ответы [ 5 ]

3 голосов
/ 07 февраля 2012

Похоже, что недостающий кусочек в вашей загадке - BlockingCollection:

FileSystemWatcher watcher;
BlockingCollection<string> bc; 
private readonly object _lock = new object();
Task[] tasks;

void PrepareWatcher()
{
    watcher = new FileSystemWatcher(@"c:");
    watcher.Created += (s,e) => 
    {
        lock(_lock) //Prevents race condition when stopping
        {
            if (!bc.IsAddingCompleted)
                bc.Add(e.FullPath);
        }
    };
}

void StartProcessing(int taskCount)
{
    tasks = new Task[taskCount];
    bc = new BlockingCollection<string>();

    for (int i = 0; i< taskCount; i++)
        tasks[i] = (Task.Factory.StartNew(() =>
        {
            foreach (var x in bc.GetConsumingEnumerable())
                ProcessXml(x);
        }, TaskCreationOptions.LongRunning)); 

    watcher.EnableRaisingEvents = true;
}

void ProcessXml(string path)
{
    //Do your processing here...
    //Note many events will be called multiple times, see:
    //http://weblogs.asp.net/ashben/archive/2003/10/14/31773.aspx
}

void StopProcessing()
{
    watcher.EnableRaisingEvents = false;

    lock (_lock) //The above line doesn't guarantee no more events will be called,
                 //And Add() and CompleteAdding() can't be called concurrently
        bc.CompleteAdding(); 

    Task.WaitAll(tasks);
    foreach (var task in tasks)
        task.Dispose();
    bc.Dispose();
    tasks = null;
}
2 голосов
/ 04 апреля 2012

Я весьма удивлен, что никто еще не спросил, но учитывая то, что вы пытаетесь достичь, является своего рода обменом сообщениями между двумя приложениями, вы рассматривали возможность использования WCF?

0 голосов
/ 06 февраля 2012

IMO, что вы хотите, это что-то вроде cron работа.Версия алгоритма может быть такой:

for every job (called periodically via cron/scheduler) run

   //
   // your program
   //
   if job_is_running {
      // Still busy...
      // don't process anything and just return back
      return
   }

   // Create your array
   //
   Array a = new Array()
   for each file in folder {
      a.append(file)
   }

   // Process each file
   //
   for each item in a {
     process_item(item);

     // Move it (or delete)
     //
     remove_from_input_folder(item)
   }

Теперь вы можете вызвать remove_from input() перед обработкой, чтобы избежать двойной обработки в случае сбоя системы.

Мне пришлось сделать что-то подобноедля телефонной компании некоторое время назад, и это было самое удобное решение, которое мы получили:)

Обновление : параллельный бит

Теоретически, циклически просматривая файлы для построения массиванезначительный по сравнению с фактической обработкой.Таким образом, вы можете легко преобразовать второй цикл в параллельный вариант на основе рабочего.

HTH

0 голосов
/ 06 февраля 2012

Возможно, петли не нужны, параллель тоже не нужна. Это было бы полезно, если вы хотите обработать пакет новых файлов. FileSystemWatcher в папке, где появятся новые файлы, даст вам событие для добавления файла в очередь.

Добавить событие для элемента, добавленного в очередь, чтобы запустить поток для обработки отдельного файла.

Если вы выберете простой класс, Файл, состояние, обнаруженное время и т. Д.

У вас есть поток обнаружения, добавляющий в очередь, пул потоков для их обработки и, при успешном удалении, из очереди.

Этот предыдущий вопрос может оказаться полезным для "безопасных" списков в .net 4

Потокобезопасный список свойство

Особенно, если вы хотите обработать все новые файлы начиная с X.

Обратите внимание, что если вы не собираетесь использовать FileSystem watcher и просто получать файлы из папки, то Обработанная папка для их перемещения и, возможно, также и Папка с ошибками, была бы хорошей идеей. Считывание 200,00 имен файлов, чтобы проверить, обработали ли вы их, отчасти исключило бы какую-либо выгоду от их параллельной обработки.

Даже если вы это сделаете, я бы порекомендовал это. Простое перемещение его обратно в To Process (или после редактирования в случае сбоев) приведет к повторной обработке. Еще одно преимущество, скажем, если вы выполняете обработку в базе данных, и все это соскакивает вверх, а ваша последняя резервная копия была в X. Вы восстанавливаете, а затем просто перемещаете все файлы, которые вы обработали, обратно в папку «toprocess».

Вы также можете выполнять тестовые прогоны с известным вводом и проверять состояние БД до и после.

Далее к комментарию.

В ThreadPool, который используется Task, установлен предел ThreadPool, предназначенный для всех или фоновых задач в вашем приложении.

После комментария.

Если вы хотите ограничить количество одновременных задач ...

Стартер на десять, который вы можете легко улучшить для настройки и повышения.

В вашем классе, который управляет выпуском задач из очереди файлов, что-то вроде

private object _canRunLock;
private int _maxTasks;
private int _activeTasks;

public MyTaskManager(int argMaxTasks)
{
  _maxTasks = argMaxTasks;
  _canRunLock = new object();
  _activeTasks = 0;
}


public bool CanRunTask(MyTask argTask)
{
  get
  {
    lock(_canRunLock)
    {
      if (_activeTasks < _maxTasks)
      {
        ExecuteTask(argTask);
        _activeTasks++;
        return true;
      }
    }
    return false;
  }
}

public void TaskCompleted()
{
  lock(_canRunLock)
  {
    if (_activeTasks > 0)
    {
      _activeTasks--;
    }
    else
    {
      throw new WTFException("Okay how did this happen?");
    }
  }
}

Просто и безопасно (я думаю). Вы можете сделать еще одно свойство приостановить или отключить, чтобы проверить. Возможно, вы захотите сделать вышеперечисленное синглтоном (:() или, по крайней мере, иметь в виду, что если вы запустите более одного ...

Лучший совет, который я могу дать, это начать с простого, открытого и отсоединенного, а затем усложнить при необходимости, легко начать преждевременную оптимизацию здесь. Хорошая идея - не загружать все потоки, ожидающие, скажем, FileSystem или бэкэнда, но я сомневаюсь, что количество процессоров когда-нибудь станет узким местом, так что ваши maxTasks немного в воздухе. Какая-то самостоятельная настройка между нижним и верхним пределами может быть хорошей вещью, а не одним фиксированным числом.

0 голосов
/ 06 февраля 2012

Я думаю, вы можете проверить, что новый файл поступает от FileSystemWatcher. Есть статья на http://www.codeproject.com/Articles/25443/Watching-Folder-Activity-in-C-NET.

FileSystemWatcher поможет вам не зацикливаться в определенной папке.

Надеюсь, эта помощь.

...