Как указать ForkJoinPool для Java 8 параллельный поток? - PullRequest
0 голосов
/ 12 сентября 2018

Как я знаю, параллельные потоки используют значение по умолчанию ForkJoinPool.commonPool, которое по умолчанию имеет на один поток меньше, чем ваши процессоры.Я хочу использовать свой собственный пул потоков.

Вот так:

@Test
public void stream() throws Exception {
    //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
    ForkJoinPool pool = new ForkJoinPool(10);
    List<Integer> testList = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
    long start = System.currentTimeMillis();
    List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
        try {
            // read from database
            Thread.sleep(1000);
            System.out.println("task" + item + ":" + Thread.currentThread());
        } catch (Exception e) {
        }
        return item * 10;
    })).get().collect(Collectors.toList());
    System.out.println(result);
    System.out.println(System.currentTimeMillis() - start);
}

И результат: enter image description here

Мой пользовательский ForkJoinPool никогда не используется.И я изменяю параллелизм по умолчанию следующим образом:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

Это работает хорошо - задачи стоят всего около 1 секунды.

В моем приложении задача содержит тяжелую операцию ввода-вывода (чтение данных из БД).Поэтому мне нужен более высокий параллелизм, но я не хочу изменять свойство JVM.

Итак, как правильно указать мой собственный ForkJoinPool?

Или как использовать параллельные потоки вIO-интенсивная ситуация?

Ответы [ 2 ]

0 голосов
/ 12 сентября 2018

потоки ленивы;Вся работа выполняется, когда вы начинаете работу терминала.В вашем случае операция терминала - .collect(Collectors.toList()), которую вы вызываете в потоке main по результату get().Следовательно, фактическая работа будет выполняться так же, как если бы вы построили весь поток в потоке main.

Чтобы ваш пул имел эффект, вы должны переместить операцию терминала вотправленное задание:

ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
    1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (InterruptedException e) {}
    return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);

Мы также можем продемонстрировать актуальность операции терминала, построив поток в потоке main и отправив только операцию терминала в пул:

Stream<Integer> stream = testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (InterruptedException e) {}
    return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();

Но вы должны иметь в виду, что это недокументированное поведение, которое не гарантируется.Фактический ответ должен состоять в том, что Stream API в его текущей форме, без контроля потока (и без помощи при работе с проверенными исключениями), не подходит для параллельных операций ввода-вывода.

0 голосов
/ 12 сентября 2018

Полагаю, вы обнаружили описанный здесь трюк:

, который сообщает

Хитрость основана на ForkJoinTask.fork, который указывает: "Обеспечивает асинхронное выполнение этой задачи в пуле, в котором выполняется текущая задача, если применимо, или использование ForkJoinPool.commonPool(), если не inForkJoinPool()"

В вашем коде parallelStream() и map(...) вызываются в пользовательском ForkJoinPool, а Function, переданном map, - нет.

Помните, что Stream#map является промежуточной операцией.Его Function будет выполняться для его элемента только после того, как будет завершена операция терминала.В вашем случае эта операция терминала collect(...).И поскольку collect(Collectors.toList() вызывается в потоке main, map Function вызывается для каждого элемента параллельно в commonPool.

Вы можете просто переместить collect(...) вызов внутри вашего submit(...).

List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (Exception e) {
    }
    return item * 10;
}).collect(Collectors.toList())).get();
...