Как мне запустить что-то параллельное в Java? - PullRequest
1 голос
/ 08 апреля 2020

Я пытаюсь распечатать все возможные комбинации в диапазоне. Например, если мой lowerBound равен 3, а мой max равен 5, я хочу следующие комбинации: (5,4 - 5,3 - 4,3). Я реализовал это с помощью функции helper(), описанной ниже.

Конечно, если мой максимум очень большой, это много комбинаций, и это займет много времени. Вот почему я пытаюсь реализовать ForkJoinPool, чтобы задачи выполнялись параллельно. Для этого я создаю новый ForkJoinPool. Затем I l oop по всем возможным значениям r (где r - количество чисел в комбинации, в приведенном выше примере r=3). Для каждого значения r я создаю новый HelperCalculator, который расширяет RecursiveTask<Void>. Там я рекурсивно вызываю функцию helper(). Каждый раз, когда я называю это, я создаю новый HelperCalculator, и я использую .fork() для этого.

Проблема заключается в следующем. Это не правильно генерирует все возможные комбинации. На самом деле он не генерирует никаких комбинаций вообще. Я пытался добавить calculator.join() после calculator.fork(), но это продолжается бесконечно, пока я не получу ошибку OutOfMemory.

Очевидно, что в ForkJoinPool я что-то не так понимаю, но я не могу ' после нескольких дней попыток.

Моя основная функция:

            ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
            for (int r = 1; r < 25; r++) {
                int lowerBound = 7;
                int[] data = new int[r];
                int max = 25;
                calculator = new HelperCalculator(data, 0, max, 0, s, n, lowerBound);
                pool.execute(calculator);
                calculator.join();
            }
            pool.shutdown();

Класс HelperCalculator:

    protected Void compute() {
        helper(data, end, start, index, s, lowerBound);
        return null;
    }

    //Generate all possible combinations
    public void helper(int[] data , int end, int start, int index,int s, int lowerBound) {
        //If the array is filled, print it
        if (index == data.length) {
                System.out.println(Arrays.toString(data));
        } else if (start >= end) {
            data[index] = start;
            if(data[0] >= lowerBound) {
                HelperCalculator calculator = new HelperCalculator(data,end, start-1, index+1, s, n, lowerBound);
                calculator.fork();
                calculators.add(calculator);
                HelperCalculator calculator2 = new HelperCalculator(data, end, start-1, index, s, n, lowerBound);
                calculator2.fork();
                calculators.add(calculator2);
            }
        }

Как сделать каждый HelperCalculator работать параллельно, чтобы одновременно было запущено 23 приложения с использованием ForkJoinPool? Или, может быть, я должен использовать другое решение?

Я пытался вызвать join() и isDone() в списке calculators, но тогда он не ждет, пока он не завершит sh должным образом и программа просто выходит.

Поскольку кто-то не понимает алгоритм, вот он:

    public static void main(String[] args) {
            for(int r = 3; r > 0; r--) {
                int[] data = new int[r];
                helper(data, 0, 2, 0);
            }
    }

    public static void helper(int[] data , int end, int start, int index) {
        if (index == data.length) {
            System.out.println(Arrays.toString(data));
        } else if (start >= end) {
            data[index] = start;
                helper(data, end, start - 1, index + 1);
                helper(data, end, start - 1, index);
            }
        }
    }

Вывод этого:

[2, 1, 0]
[2, 1]
[2, 0]
[1, 0]
[2]
[1]
[0]

1 Ответ

3 голосов
/ 09 апреля 2020

Некоторые из выполняемых вами задач пытаются использовать один и тот же массив для оценки различных комбинаций. Вы можете решить эту проблему, создав отдельный массив для каждой задачи или ограничив параллелизм теми задачами, у которых уже есть свой массив, т. Е. С другой длиной.

Но есть и другая возможность; не используйте массивы вообще. Вы можете сохранить комбинации в значениях int, так как каждое значение int является комбинацией битов. Это не только экономит много памяти, но вы также можете легко перебирать все возможные комбинации, просто увеличивая значение, поскольку перебор всех чисел int также перебирает все возможные комбинации битов¹. Единственное, что нам нужно реализовать, - это генерировать правильную строку для конкретного значения int, интерпретируя биты как числа в соответствии с их положением.

Для первой попытки мы можем выбрать более простой способ и использовать уже существующие классы:

public static void main(String[] args) {
    long t0 = System.nanoTime();
    combinations(10, 25);
    long t1 = System.nanoTime();
    System.out.println((t1 - t0)/1_000_000+" ms");
    System.out.flush();
}
static void combinations(int start, int end) {
    for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(
            BitSet.valueOf(new long[]{i}).stream()
                  .mapToObj(b -> String.valueOf(b + start))
                  .collect(Collectors.joining(", ", "[", "]"))
        );
    }
}

Метод использует эксклюзивный конец, поэтому для вашего примера вы должны назвать его как combinations(0, 3), и он выведет

[0]
[1]
[0, 1]
[2]
[0, 2]
[1, 2]
[0, 1, 2]
3 ms

из Конечно, время может отличаться

Для приведенного выше примера combinations(10, 25) он печатает все комбинации, а затем 3477 ms на моем аппарате. Это звучит как возможность оптимизировать, но мы должны сначала подумать о том, какие операции накладывают какие затраты.

Итерирование по комбинациям теперь сведено к тривиальной операции. Создание строки на порядок дороже. Но это все равно ничто по сравнению с фактической печатью, которая включает в себя передачу данных в операционную систему и, в зависимости от системы, фактический рендеринг может добавить к нашему времени. Так как это делается при удержании блокировки в PrintStream, все потоки, пытающиеся одновременно печатать, будут заблокированы, что сделает эту операцию непараллельной.

Давайте определим долю затрат, создав новую PrintStream, отключение авто-гриппа sh при разрыве строки и использование безумно большого буфера, способного вместить весь вывод:

public static void main(String[] args) {
    System.setOut(new PrintStream(
        new BufferedOutputStream(new FileOutputStream(FileDescriptor.out),1<<20),false));
    long t0 = System.nanoTime();
    combinations(10, 25);
    long t1 = System.nanoTime();
    System.out.flush();
    long t2 = System.nanoTime();
    System.out.println((t1 - t0)/1_000_000+" ms");
    System.out.println((t2 - t0)/1_000_000+" ms");
    System.out.flush();
}
static void combinations(int start, int end) {
    for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(
            BitSet.valueOf(new long[]{i}).stream()
                  .mapToObj(b -> String.valueOf(b + start))
                  .collect(Collectors.joining(", ", "[", "]"))
        );
    }
}

На моей машине он печатает что-то в порядке

93 ms
3340 ms

Показывает, что код потратил более трех секунд на непараллелизуемую печать и всего около 100 миллисекунд на расчет. Для полноты следующий код понижает уровень для поколения String:

static void combinations(int start, int end) {
    for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(bits(i, start));
    }
}
static String bits(int bits, int offset) {
    StringBuilder sb = new StringBuilder().append('[');
    for(;;) {
        int bit = Integer.lowestOneBit(bits), num = Integer.numberOfTrailingZeros(bit);
        sb.append(num + offset);
        bits -= bit;
        if(bits == 0) break;
        sb.append(", ");
    }
    return sb.append(']').toString();
}

, что вдвое сокращает время расчета на моей машине, но не оказывает заметного влияния на общее время, которое не должно приходить как сюрприз.


Но в целях обучения, игнорируя отсутствие потенциального ускорения, давайте обсудим, как мы распараллелим эту операцию.

Последовательный код уже привел задачу в форма, которая сводится к итерации от начального значения до конечного значения. Теперь мы переписываем этот код в ForkJoinTask (или подходящий подкласс), который представляет итерацию с начальным и конечным значением. Затем мы добавляем возможность разбивать эту операцию на две части, разделяя диапазон посередине, чтобы получить две задачи, повторяющиеся по каждой половине диапазона. Это может повторяться до тех пор, пока мы не решим иметь достаточно потенциально параллельных заданий и выполнить текущую итерацию локально. После локальной обработки мы должны дождаться завершения любой задачи, которую мы разделили, чтобы гарантировать, что завершение задачи root подразумевает завершение всех подзадач.

public class Combinations extends RecursiveAction {
    public static void main(String[] args) {
        System.setOut(new PrintStream(new BufferedOutputStream(
            new FileOutputStream(FileDescriptor.out),1<<20),false));
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        long t0 = System.nanoTime();
        Combinations job = Combinations.get(10, 25);
        pool.execute(job);
        job.join();
        long t1 = System.nanoTime();
        System.out.flush();
        long t2 = System.nanoTime();
        System.out.println((t1 - t0)/1_000_000+" ms");
        System.out.println((t2 - t0)/1_000_000+" ms");
        System.out.flush();
    }

    public static Combinations get(int min, int max) {
        return new Combinations(min, 1, (1 << (max - min)) - 1);
    }

    final int offset, from;
    int to;

    private Combinations(int offset, int from, int to) {
        this.offset = offset;
        this.from = from;
        this.to = to;
    }

    @Override
    protected void compute() {
        ArrayDeque<Combinations> spawned = new ArrayDeque<>();
        while(getSurplusQueuedTaskCount() < 2) {
            int middle = (from + to) >>> 1;
            if(middle == from) break;
            Combinations forked = new Combinations(offset, middle, to);
            forked.fork();
            spawned.addLast(forked);
            to = middle - 1;
        }
        performLocal();
        for(;;) {
            Combinations forked = spawned.pollLast();
            if(forked == null) break;
            if(forked.tryUnfork()) forked.performLocal(); else forked.join();
        }
    }

    private void performLocal() {
        for(int i = from, stop = to; i <= stop; i++) {
            System.out.println(bits(i, offset));
        }
    }

    static String bits(int bits, int offset) {
        StringBuilder sb = new StringBuilder().append('[');
        for(;;) {
            int bit=Integer.lowestOneBit(bits), num=Integer.numberOfTrailingZeros(bit);
            sb.append(num + offset);
            bits -= bit;
            if(bits == 0) break;
            sb.append(", ");
        }
        return sb.append(']').toString();
    }
}

getSurplusQueuedTaskCount() дает нам подсказку о насыщенности рабочих потоков, другими словами, может ли быть полезным разветвление большего количества рабочих мест. Возвращенное число сравнивается с пороговым значением, которое обычно является небольшим числом, чем более неоднородны задания и, следовательно, ожидаемая рабочая нагрузка, тем выше должно быть пороговое значение, чтобы обеспечить большее количество краж работы, когда задания завершаются раньше, чем другие. В нашем случае рабочая нагрузка должна быть очень хорошо сбалансирована.

Существует два способа разделения. Примеры часто создают две или более разветвленных подзадач с последующим присоединением к ним. Это может привести к большому количеству задач, ожидающих других. Альтернативой является разветвление подзадачи и изменение текущей задачи, чтобы представить другую. Здесь разветвленная задача представляет диапазон [middle, to], тогда как текущая задача модифицируется для представления диапазона [from, middle].

После разветвления достаточного количества задач оставшийся диапазон обрабатывается локально в текущем потоке. Затем задача будет ожидать всех разветвленных подзадач с одной оптимизацией: она будет пытаться отменить подзадачи, чтобы обработать их локально, если ни один другой рабочий поток еще не украл их.

Это работает плавно, но, к сожалению, как и ожидалось, оно не ускоряет работу, поскольку самой дорогой является печать.


¹ Использование int для представления всех комбинаций уменьшает поддерживаемую длину диапазона до 31, но имейте в виду, что такая длина диапазона подразумевает 2³¹ - 1 комбинаций, что довольно много для итерации. Если это все еще похоже на ограничение, вы можете изменить код на long. Поддерживаемой тогда длины диапазона 63, другими словами 2⁶³ - 1 комбинаций, достаточно, чтобы компьютер был занят до конца юниверса.

...