Многопоточная обработка файлов с помощью .NET - PullRequest
16 голосов
/ 11 мая 2010

Существует папка, содержащая тысячи небольших текстовых файлов. Я стремлюсь проанализировать и обработать их все, пока в папку помещается больше файлов. Мое намерение состоит в том, чтобы выполнить многопоточность этой операции, поскольку однопоточному прототипу потребовалось шесть минут для обработки 1000 файлов.

Мне нравится, когда читатель и писатель пишут следующее. В то время как потоки читателей читают файлы, я хотел бы иметь потоки писателей для их обработки. Когда читатель начнет читать файл, я хотел бы пометить его как обрабатываемый, например, переименовав его. Как только он будет прочитан, переименуйте его в завершенный.

Как мне подойти к такому многопоточному приложению?

Лучше ли использовать распределенную хеш-таблицу или очередь?

Какую структуру данных использовать, чтобы избежать блокировок?

Есть ли лучший подход к этой схеме?

Ответы [ 6 ]

26 голосов
/ 11 мая 2010

Поскольку любопытно, как .NET 4 работает с этим в комментариях, вот такой подход. Извините, это скорее всего не вариант для ОП. Отказ от ответственности: это не очень научный анализ, просто показывающий, что есть явное преимущество в производительности. В зависимости от оборудования, ваш пробег может варьироваться в широких пределах.

Вот быстрый тест (если вы видите большую ошибку в этом простом тесте, это всего лишь пример. Пожалуйста, прокомментируйте, и мы можем исправить его, чтобы он был более полезным / точным). Для этого я просто поместил 12 000 ~ 60 КБ файлов в каталог в качестве образца (запустите LINQPad ; вы можете поиграть с ним самостоятельно, бесплатно! - не забудьте получить LINQPad 4, хотя ):

var files = 
Directory.GetFiles("C:\\temp", "*.*", SearchOption.AllDirectories).ToList();

var sw = Stopwatch.StartNew(); //start timer
files.ForEach(f => File.ReadAllBytes(f).GetHashCode()); //do work - serial
sw.Stop(); //stop
sw.ElapsedMilliseconds.Dump("Run MS - Serial"); //display the duration

sw.Restart();
files.AsParallel().ForAll(f => File.ReadAllBytes(f).GetHashCode()); //parallel
sw.Stop();
sw.ElapsedMilliseconds.Dump("Run MS - Parallel");

Незначительное изменение цикла для распараллеливания запроса - это все, что нужно для большинство простые ситуации . Под «простым» я в основном подразумеваю, что результат одного действия не влияет на следующее. Чаще всего следует помнить, что некоторые коллекции, например, наша удобная List<T>, не безопасна для потоков , поэтому использование ее в параллельном сценарии это хорошая идея :) К счастью, в .NET 4 были добавлены одновременные коллекции, которые являются поточно-ориентированными. Также имейте в виду, что если вы используете блокирующую коллекцию, это может быть узким местом, в зависимости от ситуации.

Используются расширения .AsParallel<T>(IEnumeable<T>) и .ForAll<T>(ParallelQuery<T>), доступные в .NET 4.0. Вызов .AsParallel() включает IEnumerable<T> в ParallelEnumerableWrapper<T> (внутренний класс), который реализует ParallelQuery<T>. Теперь это позволяет вам использовать методы параллельного расширения , в данном случае мы используем .ForAll().

.ForAll() внутренне создает ForAllOperator<T>(query, action) и запускает его синхронно. Это обрабатывает многопоточность и объединение потоков после того, как они запущены ... Там довольно много происходит, я бы предложил начинать здесь, если вы хотите узнать больше, включая дополнительные опции .


Результаты (Компьютер 1 - Физический жесткий диск):

  • Серийный номер: 1288 - 1333мс
  • Параллельно: 461 - 503 мс

Характеристики компьютера - для сравнения:

Результаты (Компьютер 2 - Твердотельный накопитель):

  • Серийный номер: 545 - 601 мс
  • Параллельно: 248 - 278 мс

Характеристики компьютера - для сравнения:

  • Quad Core 2 Quad Q9100 @ 2,26 ГГц
  • 8 ГБ ОЗУ (DDR 1333)
  • 120 ГБ OCZ Vertex SSD (Стандартная версия - прошивка 1.4)

На этот раз у меня нет ссылок на ЦП / ОЗУ, они были установлены. Это ноутбук Dell M6400 ( вот ссылка на M6500 ... собственные ссылки Dell на на 6400 не работают ).


Эти числа взяты из 10 прогонов, принимая мин / макс внутренних 8 результатов (удаляя исходные мин / макс для каждого возможного выброса). Здесь мы сталкиваемся с узким местом ввода / вывода, особенно на физическом диске, но подумайте о том, что делает последовательный метод. Он читает, обрабатывает, читает, обрабатывает, промыть, повторить. При параллельном подходе вы (даже с узким местом ввода / вывода) считываете и обрабатываете одновременно . В худшем случае вы обрабатываете один файл, а читаете следующий. Одно это (на любом текущем компьютере!) Должно привести к увеличению производительности * . Вы можете видеть, что в приведенных выше результатах мы можем получить чуть больше, чем один раз, давая нам здоровый импульс.

Еще один отказ от ответственности: Quad Core + .NET 4 параллель не даст вам четырехкратного увеличения производительности, он не масштабируется линейно ... Есть и другие соображения и узкие места в игре.

Надеюсь, это было интересно показать подход и возможные выгоды. Не стесняйтесь критиковать или улучшать ... Этот ответ существует только для любопытных, как указано в комментариях:)

6 голосов
/ 11 мая 2010

Дизайн

Шаблон Производитель / Потребитель, вероятно, будет наиболее полезным в этой ситуации. Вы должны создать достаточно потоков, чтобы максимизировать пропускную способность.

Вот несколько вопросов о шаблоне «производитель / потребитель», чтобы дать вам представление о том, как он работает:

Вам следует использовать блокирующую очередь, и производитель должен добавлять файлы в очередь, пока потребители обрабатывают файлы из очереди. Очередь блокировки не требует блокировки, поэтому она является наиболее эффективным способом решения вашей проблемы.

Если вы используете .NET 4.0, существует несколько одновременных коллекций , которые вы можете использовать "из коробки":

1031 * Threading * Один поток производителя, вероятно, будет наиболее эффективным способом загрузки файлов с диска и помещения их в очередь; впоследствии несколько потребителей будут выталкивать элементы из очереди и обрабатывать их. Я бы посоветовал вам попробовать 2-4 пользовательских потока на ядро ​​и провести некоторые измерения производительности, чтобы определить, какой из них наиболее оптимален (то есть количество потоков, обеспечивающих максимальную пропускную способность). Я бы не рекомендовал использовать ThreadPool для этого конкретного примера. P.S. Я не понимаю, в чем проблема с одной точкой отказа и использованием распределенных хеш-таблиц? Я знаю, что DHT звучат очень круто, но я бы сначала попробовал обычные методы, если у вас нет конкретной проблемы, которую вы пытаетесь решить.

3 голосов
/ 12 мая 2010

Я рекомендую ставить поток в очередь для каждого файла и отслеживать текущие потоки в словаре, запускать новый поток по завершении потока до максимального предела. Я предпочитаю создавать свои собственные потоки, когда они могут быть длительными, и использовать обратные вызовы, чтобы сигнализировать, когда они закончили или столкнулись с исключением. В приведенном ниже примере я использую словарь для отслеживания запущенных рабочих экземпляров. Таким образом, я могу вызвать экземпляр, если я хочу прекратить работу пораньше. Обратные вызовы также могут быть использованы для обновления пользовательского интерфейса с прогрессом и пропускной способностью. Вы также можете динамически регулировать ограничение рабочего потока для добавленных точек.

Пример кода является сокращенным демонстратором, но он работает.

class Program
{
    static void Main(string[] args)
    {
        Supervisor super = new Supervisor();
        super.LaunchWaitingThreads();

        while (!super.Done) { Thread.Sleep(200); }
        Console.WriteLine("\nDone");
        Console.ReadKey();
    }
}

public delegate void StartCallbackDelegate(int idArg, Worker workerArg);
public delegate void DoneCallbackDelegate(int idArg);

public class Supervisor
{
    Queue<Thread> waitingThreads = new Queue<Thread>();
    Dictionary<int, Worker> runningThreads = new Dictionary<int, Worker>();
    int maxThreads = 20;
    object locker = new object();

    public bool Done { 
        get { 
            lock (locker) {
                return ((waitingThreads.Count == 0) && (runningThreads.Count == 0)); 
            } 
        } 
    }

    public Supervisor()
    {
        // queue up a thread for each file
        Directory.GetFiles("C:\\folder").ToList().ForEach(n => waitingThreads.Enqueue(CreateThread(n)));
    }

    Thread CreateThread(string fileNameArg)
    {
        Thread thread = new Thread(new Worker(fileNameArg, WorkerStart, WorkerDone).ProcessFile);
        thread.IsBackground = true;
        return thread;
    }

    // called when a worker starts
    public void WorkerStart(int threadIdArg, Worker workerArg)
    {
        lock (locker)
        {
            // update with worker instance
            runningThreads[threadIdArg] = workerArg;
        }
    }

    // called when a worker finishes
    public void WorkerDone(int threadIdArg)
    {
        lock (locker)
        {
            runningThreads.Remove(threadIdArg);
        }
        Console.WriteLine(string.Format("  Thread {0} done", threadIdArg.ToString()));
        LaunchWaitingThreads();
    }

    // launches workers until max is reached
    public void LaunchWaitingThreads()
    {
        lock (locker)
        {
            while ((runningThreads.Count < maxThreads) && (waitingThreads.Count > 0))
            {
                Thread thread = waitingThreads.Dequeue();
                runningThreads.Add(thread.ManagedThreadId, null); // place holder so count is accurate
                thread.Start();
            }
        }
    }
}

public class Worker
{
    string fileName;
    StartCallbackDelegate startCallback;
    DoneCallbackDelegate doneCallback;
    public Worker(string fileNameArg, StartCallbackDelegate startCallbackArg, DoneCallbackDelegate doneCallbackArg)
    {
        fileName = fileNameArg;
        startCallback = startCallbackArg;
        doneCallback = doneCallbackArg;
    }

    public void ProcessFile()
    {
        startCallback(Thread.CurrentThread.ManagedThreadId, this);
        Console.WriteLine(string.Format("Reading file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId.ToString()));
        File.ReadAllBytes(fileName);
        doneCallback(Thread.CurrentThread.ManagedThreadId);
    }
}
1 голос
/ 11 мая 2010

Вообще говоря, 1000 маленьких файлов (как маленький, кстати?) Не должны занимать шесть минут для обработки. В качестве быстрого теста выполните find "foobar" * в каталоге, содержащем файлы (первый аргумент в кавычках не имеет значения; это может быть что угодно) и посмотрите, сколько времени потребуется для обработки каждого файла. Если это займет больше одной секунды, я буду разочарован.

Если этот тест подтвердит мое подозрение, тогда процесс будет связан с процессором, и вы не получите никакого улучшения от разделения чтения на его собственный поток. Вы должны:

  1. Выясните, почему в среднем требуется более 350 мс для обработки небольшого ввода и, надеюсь, улучшить алгоритм.
  2. Если нет способа ускорить алгоритм и у вас есть многоядерный компьютер (в наши дни это делают почти все), используйте пул потоков, чтобы назначить 1000 задач для каждой задачи чтения одного файла.
1 голос
/ 11 мая 2010

Вы можете иметь центральную очередь, потоки считывателя должны были бы иметь доступ для записи во время передачи содержимого в памяти в очередь. Потокам обработки потребуется доступ на чтение к этой центральной очереди, чтобы вытолкнуть следующий поток памяти, подлежащий обработке. Таким образом, вы минимизируете время, затрачиваемое на блокировки, и не сталкиваетесь со сложностями кода без блокировки.

РЕДАКТИРОВАТЬ: В идеале, вы должны обрабатывать все исключения / условия ошибки (если таковые имеются), так что у вас нет точек сбоя.

В качестве альтернативы, вы можете иметь несколько потоков, каждый из которых «запрашивает» файл, переименовывая его перед обработкой, таким образом, файловая система становится реализацией для заблокированного доступа. Понятия не имею, если это более эффективно, чем мой первоначальный ответ, только тестирование покажет.

0 голосов
/ 11 мая 2010

Вы можете рассмотреть очередь файлов для обработки. Заполните очередь один раз, просканировав каталог при запуске и обновив очередь с помощью FileSystemWatcher , чтобы эффективно добавлять новые файлы в очередь без постоянного повторного сканирования каталога.

Если это вообще возможно, читать и записывать на разные физические диски. Это даст вам максимальную производительность ввода-вывода.

Если у вас есть первоначальный пакет из множества файлов для обработки, а затем неравномерный темп добавления новых файлов, и все это происходит на одном диске (чтение / запись), вы можете рассмотреть буферизацию обработанного файлы в память, пока не будет выполнено одно из двух условий:

  • Нет (временно) новых файлов
  • Вы буферизовали так много файлов, что Вы не хотите использовать больше памяти для буферизация (в идеале настраиваемый порог)

Если ваша фактическая обработка файлов требует интенсивной загрузки ЦП, вы можете рассмотреть возможность использования одного потока обработки на ядро ​​ЦП. Однако для «нормальной» обработки процессорное время будет тривиальным по сравнению со временем ввода-вывода, и сложность не будет стоить каких-либо незначительных выигрышей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...