ExecutorService с огромным количеством задач - PullRequest
0 голосов
/ 28 июня 2018

У меня есть список файлов и список анализаторов, которые анализируют эти файлы. Количество файлов может быть большим (200 000) и количество анализаторов (1000). Таким образом, общее количество операций может быть очень большим (200 000 000). Теперь мне нужно применить многопоточность, чтобы ускорить процесс. Я придерживался этого подхода:

ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (File file : listOfFiles) {
  for (Analyzer analyzer : listOfAnalyzers){
    executor.execute(() -> {
      boolean exists = file.exists();
      if(exists){
        analyzer.analyze(file);
      }
    });
  }
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

Но проблема этого подхода в том, что он берет слишком много из памяти, и я думаю, что есть лучший способ сделать это. Я все еще новичок в Java и многопоточности.

Ответы [ 2 ]

0 голосов
/ 28 июня 2018

Одна идея состоит в том, чтобы использовать алгоритм fork / join и группировать элементы (файлы) в пакеты для индивидуальной обработки.

Мое предложение следующее:

  1. Во-первых, отфильтруйте все файлы, которые не существуют - они занимают ресурсы без необходимости.
  2. Следующий псевдокод демонстрирует алгоритм, который может вам помочь:

    public static class CustomRecursiveTask extends RecursiveTask<Integer {
    
    private final Analyzer[] analyzers;
    
    private final int threshold;
    
    private final File[] files;
    
    private final int start;
    
    private final int end;
    
    public CustomRecursiveTask(Analyzer[] analyzers,
                               final int threshold,
                               File[] files,
                               int start,
                               int end) {
        this.analyzers = analyzers;
        this.threshold = threshold;
        this.files = files;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Integer compute() {
        final int filesProcessed = end - start;
        if (filesProcessed < threshold) {
            return processSequentially();
        } else {
            final int middle = (start + end) / 2;
            final int analyzersCount = analyzers.length;
    
            final ForkJoinTask<Integer> left =
                    new CustomRecursiveTask(analyzers, threshold, files, start, middle);
            final ForkJoinTask<Integer> right =
                    new CustomRecursiveTask(analyzers, threshold, files, middle + 1, end);
            left.fork();
            right.fork();
    
            return left.join() + right.join();
        }
    }
    
    private Integer processSequentially() {
        for (int i = start; i < end; i++) {
            File file = files[i];   
            for(Analyzer analyzer : analyzers) { analyzer.analyze(file) };
        }
    
        return 1;
    }
    }
    

И использование выглядит следующим образом:

 public static void main(String[] args) {
    final Analyzer[] analyzers = new Analyzer[]{};
    final File[] files = new File[] {};

    final int threshold = files.length / 5;

    ForkJoinPool.commonPool().execute(
            new CustomRecursiveTask(
                    analyzers,
                    threshold,
                    files,
                    0,
                    files.length
            )
    );
}

Обратите внимание, что в зависимости от ограничений вы можете манипулировать аргументами конструктора задачи, чтобы алгоритм подстраивался под количество файлов.

Вы можете указать различные threshold s, скажем, в зависимости от количества файлов.

final int threshold;
if(files.length > 100_000) {
   threshold = files.length / 4;
} else {
   threshold = files.length / 8;
}

Вы также можете указать количество рабочих потоков в ForkJoinPool в зависимости от введенного количества.

Измерьте, отрегулируйте, измените, в конечном итоге вы решите проблему.

Надеюсь, это поможет.

UPDATE:

Если анализ результатов не представляет интереса, вы можете заменить RecursiveTask на RecursiveAction. Псевдокод добавляет промежуточный автобокс между ними.

0 голосов
/ 28 июня 2018

Где будут находиться 200 миллионов задач? Надеюсь, не в памяти, если только вы не планируете реализовывать свое решение в распределенном режиме. Тем временем вам нужно создать экземпляр ExecutorService, который не накапливает массивную очередь. Используйте с «политикой выполнения вызовов» (см. здесь ) при создании службы . Если вы попытаетесь поместить еще одну задачу в очередь, когда она уже заполнена, вы в конечном итоге выполните ее самостоятельно, что, вероятно, вам и нужно.

OTOH, теперь, когда я посмотрю на ваш вопрос более добросовестно, почему бы не проанализировать одновременно один файл? Тогда очередь никогда не будет больше количества анализаторов. Это то, что я бы сделал, честно говоря, так как я хотел бы, чтобы читаемый журнал содержал сообщение для каждого файла при его загрузке в правильном порядке.

Я прошу прощения за то, что не был более полезным:

analysts.stream().map(analyst -> executor.submit(() -> analyst.analyze(file))).map(Future::get);

По сути, создайте связку фьючерсов для одного файла, затем подождите всех из них, прежде чем двигаться дальше.

...