как использовать несколько потоков для обработки большого количества файлов, хранящихся на локальном диске (с помощью блокировки файлов) - PullRequest
3 голосов
/ 18 сентября 2009

как использовать несколько потоков в Java для обработки большого количества файлов, хранящихся в локальном каталоге диска (с помощью блокировки файлов)

Ответы [ 7 ]

5 голосов
/ 18 сентября 2009

Вы не хотите читать файлы в параллель (дисковый ввод / вывод не распараллеливается). Лучше позволить одному потоку читать файлы, отправлять содержимое рабочим потокам для параллельной обработки, а затем собирать результаты от рабочих. Использование превосходных ExecutorService & c: o от java.util.concurrent избавляет вас от грязных деталей многопоточности и делает ваше решение гораздо более гибким.

Вот простой пример. Предполагая, Foo является результатом обработки файла:

public List<Foo> processFiles(Iterable<File> files){
    List<Future<Foo>> futures = new ArrayList<Future<Foo>>();
    ExecutorService exec = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors());
    for (File f : files){
        final byte[] bytes = readAllBytes(f); // defined elsewhere
        futures.add(exec.submit(new Callable<Foo>(){
            public Foo call(){
                InputStream in = new ByteArrayInputStream(bytes);
                // Read a Foo object from "in" and return it
            }
        }));
    }
    List<Foo> foos = new List<Foo>(futures.size());
    for (Future<Foo> f : futures) foos.add(f.get());
    exec.shutdown();
    return foos;
}

TODO: добавить обработку исключений и т. Д. Вы также можете создать экземпляр ExecutorService вне processFiles, чтобы вы могли использовать его между вызовами.

4 голосов
/ 18 сентября 2009

Лучший способ сделать это (на любом языке, не только на Java) - использовать парадигму производителя / мультипотребителя.

Пусть один поток создаст очередь, а затем запустит N другие потоки. Этот основной поток затем перечислит все файлы и поместит их имена в эту очередь. Затем он поместит N маркеры конца очереди в очередь.

«Другие» потоки просто читают следующее имя из этой очереди и обрабатывают файл. Когда они считывают маркер конца очереди, они выходят (и основной поток может получить свой статус выхода, если это необходимо).

Это упрощает связь между потоками в очереди (которая, разумеется, должна быть защищена мьютексом, чтобы не вызывать условия гонки со всеми потоками). Это также позволяет потокам контролировать свое собственное условие выхода (в направлении от основного потока), что является еще одним хорошим способом избежать определенных проблем с многопоточностью.

3 голосов
/ 18 сентября 2009

Вот как я обычно это делаю.

Вы можете создать очередь блокировки следующим образом:

 LinkedBlockingQueue<String> files;
 files = new LinkedBlockingQueue<String>(1000); 
 AtomicBoolean done = new AtomicBoolean(false);

Очередь может содержать только 1000 элементов, поэтому, если у вас есть миллиард файлов или что-то еще, вам не нужно беспокоиться о нехватке памяти. Вы можете изменить размер на любой, в зависимости от того, сколько памяти вы хотите занять.

В вашей основной теме вы делаете что-то вроде:

File directory = new File("path\to\folder");
for(File file : directory.listFiles()){
   files.put(file.getAbsolutePath());
}
files.put(null);//this last entry tells the worker threads to stop

Функциональные блоки put до тех пор, пока в очереди не освободится место, поэтому при заполнении файлы прекратят чтение. Конечно, поскольку File.listFiles () на самом деле возвращает массив, а не коллекцию, которую не нужно загружать целиком в память, вы все равно заканчиваете тем, что загружаете полный список файлов в память, если используете эту функцию. Если это окажется проблемой, думаю, вам придется заняться чем-то другим.

Но эта модель также работает, если у вас есть какой-то другой метод перечисления файлов (например, если они все находятся в базе данных или что-то еще). Просто замените вызов directory.listFiles () тем, что вы используете, чтобы получить свой файл. список. Кроме того, если вам нужно обрабатывать файлы в подкаталогах, вам придется рекурсивно просматривать их, что может раздражать (но это решает проблему с памятью для очень больших каталогов)

затем в ваших рабочих потоках:

public void run(){
   while(!done.get()){
      String filename = files.take();
      if(filename != null){
         //do stuff with your file.   
      }
      else{
        done.set(true);//signal to the other threads that we found the final element.
      }
   }
}

Если все файлы в очереди были обработаны, take будет ждать, пока не появятся новые элементы.

В любом случае, это основная идея, этот код находится у меня в голове и не был протестирован в точности как есть.

1 голос
/ 31 августа 2017

С Java 8 вы можете легко добиться этого, используя parallel streams. Смотрите следующий фрагмент кода:

    try {
        Files.walk(Paths.get("some-path")).parallel().forEach(file -> {/*do your processing*/});
    } catch (IOException e1) {
        e1.printStackTrace();
    }

При параллельном потоке время выполнения порождает необходимое количество потоков, не превышающее количество логических ядер ЦП, для параллельной обработки элементов коллекции, в нашем случае файлов. Вы также можете контролировать количество потоков, передавая его в качестве аргумента JVM.

Преимущество этого подхода заключается в том, что вам не нужно выполнять низкоуровневую работу по созданию и поддержке потоков. Вы просто сосредотачиваетесь на своей проблеме высокого уровня.

0 голосов
/ 27 августа 2015

Я работаю над аналогичной проблемой, когда мне нужно обработать несколько тысяч текстовых файлов. У меня есть файл poller, который опрашивает каталог и готовит список файлов, найденных в каталоге (включая подкаталоги), и вызывает метод, скажем, fileFound со списком в качестве аргумента.

В методе fileFound я перебираю список и создаю новый поток для каждого файла. Я использую ExecutorService для контроля количества активных потоков. Код выглядит так:

public void fileFound(List<File> fileList) {
    for (File file : fileList) {
        FileProcessor fprocessor = new FileProcessor(file); // run() method takes care of implementing business rules for the file.
        EXECUTOR.submit(fprocessor); //ExecutorService EXECUTOR = Executors.newFixedThreadPool(10);
    }
}

Мое наблюдение:

  1. При обработке файлов один за другим, без многопоточности, при обработке файлов размером 3,5 КБ (всего ~ 32 ГБ) это заняло ~ 9 часов.
  2. Использование многопоточности:

    При фиксированном количестве потоков от 5 до 118 минут.

    При фиксированном количестве потоков от 10 до 75 минут.

    При фиксированном количестве потоков от 15 до 72 минут.

0 голосов
/ 18 сентября 2009

Что вы действительно хотите сделать, так это чтобы ваша основная программа прошла через каталог, получив Файл ссылок. Используйте эти ссылки для создания объекта, который реализует Runnable . Метод run () объекта Runnable - это вся ваша логика обработки. Создайте ExecutorService и вызовите execute (Runnable), чтобы передать задачи службе executor. Executor будет запускать задачи, в которых запрашиваемые потоки станут доступными, в зависимости от типа созданного вами Executor (Executors.newFixedThreadPool () - хороший выбор. Когда ваш основной поток обнаружил все файлы и отправил их как задачи, вы хотите вызвать 1009 * shutdown () в Executor, а затем вызвать [awaitTermination ()] [6]. Вызов shutdown () сообщает исполнителю о завершении выполнения заданных задач, а затем о закрытии, вызов awaitTermination () вызывает ваш основной поток блокировать до тех пор, пока исполнитель не выключится. Это, конечно, предполагает, что вы хотите дождаться завершения всех задач и затем выполнить дополнительную обработку.

[6]: http://java.sun.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination(long, java.util.concurrent.TimeUnit)

0 голосов
/ 18 сентября 2009

Большая часть работы была проделана для вас в классах Java Concurrency. Вы, вероятно, хотите что-то вроде ConcurrentLinkedQueue .

Неограниченная потокобезопасная очередь на основе связанных узлов. Эта очередь упорядочивает элементы FIFO (первым пришел-первым вышел). Главой очереди является тот элемент, который находился в очереди самый длинный раз. Хвост очереди - это тот элемент, который находился в очереди кратчайшее время. Новые элементы вставляются в конец очереди, а операции поиска очереди получают элементы в начале очереди. ConcurrentLinkedQueue является подходящим выбором, когда множество потоков будут совместно использовать доступ к общей коллекции.

Вы используете метод offer () для помещения записей в очередь, либо в основном потоке, либо в отдельном потоке. Затем у вас есть куча рабочих пчел (в идеале, созданных в чем-то вроде ExecutorService ), которые используют метод poll (), чтобы извлечь следующую запись из очереди и обработать ее.

Использование этого дизайна дает вам невероятную гибкость в определении того, сколько производителей и сколько потребителей работают одновременно, без необходимости делать какой-либо код ожидания / опроса самостоятельно. Вы можете создать свой пул миньонов, используя Executors.newFixedThreadPool ().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...