Чтение и запись нескольких файлов параллельно - PullRequest
2 голосов
/ 05 января 2012

Мне нужно написать программу на Java, которая будет читать относительно большое количество (~ 50 000) файлов в дереве каталогов, обрабатывать данные и выводить обработанные данные в отдельный (плоский) каталог.

В настоящее время у меня есть что-то вроде этого:

private void crawlDirectoyAndProcessFiles(File directory) {
  for (File file : directory.listFiles()) {
    if (file.isDirectory()) {
      crawlDirectoyAndProcessFiles(file);
    } else { 
      Data d = readFile(file);
      ProcessedData p = d.process();
      writeFile(p,file.getAbsolutePath(),outputDir);
    }
  }
}

Достаточно сказать, что каждый из этих методов удален и обрезан для удобства чтения, но все они работают нормально.Весь процесс работает нормально, за исключением того, что он медленный.Обработка данных происходит через удаленный сервис и занимает от 5 до 15 секунд.Умножьте это на 50 000 ...

Я никогда не делал ничего многопоточного раньше, но я полагаю, что если получу, то получу довольно неплохое увеличение скорости.Кто-нибудь может дать некоторые указания, как я могу эффективно распараллелить этот метод?

Ответы [ 3 ]

9 голосов
/ 05 января 2012

Я бы использовал ThreadPoolExecutor для управления потоками.Вы можете сделать что-то вроде этого:

private class Processor implements Runnable {
    private final File file;

    public Processor(File file) {
        this.file = file;
    }

    @Override
    public void run() {
        Data d = readFile(file);
        ProcessedData p = d.process();
        writeFile(p,file.getAbsolutePath(),outputDir);
    }
}

private void crawlDirectoryAndProcessFiles(File directory, Executor executor) {
    for (File file : directory.listFiles()) {
        if (file.isDirectory()) {
          crawlDirectoryAndProcessFiles(file,executor);
        } else {
            executor.execute(new Processor(file); 
        }
    }
}

Вы получите Executor, используя:

ExecutorService executor = Executors.newFixedThreadPool(poolSize);

, где poolSize - максимальное количество потоков, которые вы хотите создать одновременно.(Здесь важно иметь разумное число; 50000 потоков - это не совсем хорошая идея. Разумное число может быть 8.) Обратите внимание, что после того, как вы поставили все файлы в очередь, ваш основной поток может ждать, пока все не будет сделано, вызываяexecutor.awaitTermination.

6 голосов
/ 05 января 2012

Если у вас есть один жесткий диск (то есть тот, который допускает только одну одновременную операцию чтения, а не SSD или RAID-массив, сетевая файловая система и т. Д.), То вам нужен только один поток, выполняющий ввод-вывод (чтение из /запись на диск).Кроме того, вы хотите, чтобы столько потоков выполняло операции с привязкой к процессору, сколько у вас ядер, в противном случае время будет потрачено впустую на переключение контекста.

Учитывая приведенные выше ограничения, приведенный ниже код должен работать для вас.Однопоточный исполнитель гарантирует, что только один Runnable выполняется одновременно.Фиксированный пул потоков гарантирует, что в каждый момент времени выполняется не более NUM_CPUS Runnable с.

Одна вещь, которую он не делает, - это предоставление обратной связи по окончании обработки.

private final static int NUM_CPUS = 4;

private final Executor _fileReaderWriter = Executors.newSingleThreadExecutor();
private final Executor _fileProcessor = Executors.newFixedThreadPool(NUM_CPUS);

private final class Data {}
private final class ProcessedData {}

private final class FileReader implements Runnable
{
  private final File _file;
  FileReader(final File file) { _file = file; }
  @Override public void run() 
  { 
    final Data data = readFile(_file);
    _fileProcessor.execute(new FileProcessor(_file, data));
  }

  private Data readFile(File file) { /* ... */ return null; }    
}

private final class FileProcessor implements Runnable
{
  private final File _file;
  private final Data _data;
  FileProcessor(final File file, final Data data) { _file = file; _data = data; }
  @Override public void run() 
  { 
    final ProcessedData processedData = processData(_data);
    _fileReaderWriter.execute(new FileWriter(_file, processedData));
  }

  private ProcessedData processData(final Data data) { /* ... */ return null; }
}

private final class FileWriter implements Runnable
{
  private final File _file;
  private final ProcessedData _data;
  FileWriter(final File file, final ProcessedData data) { _file = file; _data = data; }
  @Override public void run() 
  { 
    writeFile(_file, _data);
  }

  private Data writeFile(final File file, final ProcessedData data) { /* ... */ return null; }
}

public void process(final File file)   
{ 
  if (file.isDirectory())
  {
    for (final File subFile : file.listFiles())
      process(subFile);
  }
  else
  {
    _fileReaderWriter.execute(new FileReader(file));
  }
}
1 голос
/ 05 января 2012

Самый простой (и, вероятно, один из самых разумных) способов - это иметь пул потоков (см. Соответствующий Executor). Основной поток отвечает за сканирование в каталоге. При обнаружении файла создайте «Задание» (которое является Runnable / Callable) и дайте Исполнителю справиться с заданием.

(этого должно быть достаточно для начала, я предпочитаю не давать слишком много конкретного кода, потому что вам не составит труда выяснить, прочитав часть Executor, Callable и т. Д.)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...