Почему обработка параллельного потока с использованием лямбды в блоке статического инициализатора с помощью forEachOrdered приводит к взаимоблокировке, но не с помощью forEach? - PullRequest
0 голосов
/ 30 октября 2018

Во время игры с параллельными потоками Java у меня возникали взаимоблокировки, когда некоторые параллельные операции выполнялись в статическом блоке инициализатора.

При использовании последовательного потока все работает нормально:

import java.util.Arrays;
public class Example1 {
    static {
        // displays the numbers from 1 to 10 ordered => no thread issue
        Arrays.asList(1,2,3,4,5,6,7,8,9,10)
             .forEach(s->System.out.println(s));
    }
    public static final void main(String[] args) {}
}

При параллельной обработке потока каждая работа (числа отображаются без порядка):

import java.util.Arrays;
public class Example2 {
    static {
        // displays the numbers from 1 to 10 unordered => no thread issue
        Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
             .forEach(s->System.out.println(s));
    }
    public static final void main(String[] args) {}
}

Однако при обработке потока с помощью forEachOrdered() возникает тупик (я полагаю, это связано с взаимодействием между основным потоком и управлением ForkJoinPool):

import java.util.Arrays;
public class Example3 {
    static {
        // hangs forever (deadlock between the main thread which loads the class and the underlying ForkJoinPool which join several tasks)
        Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
                .forEachOrdered(s->System.out.println(s));
    }
    public static final void main(String[] args) {}
}

Но при порождении потоковой обработки в отдельном потоке все идет хорошо:

import java.util.Arrays;
public class Example4 {
    static {
        // displays the numbers from 1 to 10 ordered => no thread issue
        new Thread(()->
            Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
                 .forEachOrdered(s->System.out.println(s))
        ).start();
    }
    public static final void main(String[] args) {}
}

Из того, что я видел из дампа потоков, основной поток ожидает на ForkJoinPool, используемом в .forEachOrdered(), чтобы завершить свою работу, но первый рабочий поток в пуле заблокирован, ожидая чего-то (скорее всего, заблокирован main)

Мне бы очень хотелось понять, почему в некоторых случаях возникает тупик, а не в других. Это, очевидно, не только из-за использования статического блока инициализатора, параллельного потока и лямбды, потому что Example2, Example3 и Example4 используют эти три понятия, но только Example3 вызывает тупик.

Хотя этот вопрос может выглядеть как дубликат Почему параллельный поток с лямбдой в статическом инициализаторе вызывает тупик? , это не так. Мой вопрос выходит за рамки связанного, поскольку он предоставляет Example2, для которого у нас есть статический блок инициализатора, параллельный поток и лямбда, но нет тупика. Вот почему заголовок вопроса содержит «может привести к тупику, но не обязательно».

1 Ответ

0 голосов
/ 30 октября 2018

Это тупиковое поведение имеет две основные причины:

  1. Поток main ожидает, что другой поток (скажем, OtherThread) завершит свою работу (в примере 3 OtherThread является одним из потоков ForkJoinPool, используемых операцией forEachOrdered())
  2. OtherThread использует лямбда-выражение, которое будет определено потоком main, но позже (напомним: лямбда-выражения создаются во время выполнения, а не во время компиляции). В примере 3 эта лямбда - та, что в .forEachOrdered().

Давайте рассмотрим примеры и объясним, почему они приводят к тупику или нет.

Пример1

Только один поток (main) выполняет следующие операции:

  1. обрабатывает статический блок инициализатора
  2. сделать foreach для каждого элемента
  3. создает лямбда-выражение во время выполнения при обработке первого элемента потока

Поскольку существует только один поток, тупиковая ситуация невозможна.

* +1033 * Пример2

Чтобы лучше понять процесс обработки, мы можем переписать его как:

import java.util.Arrays;
public class Example2Instrumented {
    static {
        // displays the numbers from 1 to 10 unordered => no thread issue
        System.out.println(Thread.currentThread().getName()+" : "+"static initializer");
        Arrays.asList(1,2,3,4,5,6,7,8,9,10)
             .parallelStream()
             .forEach(s->System.out.println(Thread.currentThread().getName()+" : "+s));
    }
    public static final void main(String[] args) {}
}

Это дает следующий результат:

main : static initializer
main : 7
main : 6
ForkJoinPool.commonPool-worker-2 : 9
ForkJoinPool.commonPool-worker-4 : 5
ForkJoinPool.commonPool-worker-9 : 3
ForkJoinPool.commonPool-worker-11 : 2
ForkJoinPool.commonPool-worker-2 : 10
ForkJoinPool.commonPool-worker-4 : 4
ForkJoinPool.commonPool-worker-9 : 1
ForkJoinPool.commonPool-worker-13 : 8

Поток main обрабатывает статический инициализатор, затем запускает forEach и строит лямбда во время выполнения при обработке первого элемента. Другие элементы потока обрабатываются рабочими потоками из ForkJoinPool. Нет тупика, потому что поток main обработал первый элемент и построил лямбду.

1047 * Пример3 * Мы можем переписать Example3 без лямбды, чтобы выйти из тупика: import java.util.Arrays; import java.util.function.Consumer; public class Example3NoDeadlock { static { // displays the numbers from 1 to 10 ordered => no thread issue anymore Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream() .forEachOrdered( new Consumer<Integer>() { @Override public void accept(Integer t) { System.out.println(t); }}); } public static final void main(String[] args) {} } Поскольку класс Consumer создается во время компиляции (в отличие от лямбд, которые создаются во время выполнения), это прерывает цикл взаимоблокировки. Это доказывает, что по крайней мере лямбда вовлечена в тупик. Для лучшего понимания мы могли бы использовать код следующим образом: import java.util.Arrays; import java.util.function.Consumer; public class Example3Instrumented { static { System.out.println("static initializer"); // hangs forever (deadlock between the main thread which loads the class and the underlying ForkJoinPool which join several tasks) Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream() .peek(new Consumer<Integer>() { @Override public void accept(Integer t) { System.out.println(Thread.currentThread().getName()+" "+t); }}) .forEachOrdered(s->System.out.println(s)); } public static final void main(String[] args) {} } Это дает следующий вывод: main : static initializer ForkJoinPool.commonPool-worker-6 1 ForkJoinPool.commonPool-worker-9 3 main 7 ForkJoinPool.commonPool-worker-4 2 ForkJoinPool.commonPool-worker-13 6 ForkJoinPool.commonPool-worker-11 8 ForkJoinPool.commonPool-worker-15 5 ForkJoinPool.commonPool-worker-2 9 ForkJoinPool.commonPool-worker-4 10 ForkJoinPool.commonPool-worker-9 4 Поток main обрабатывает статический инициализатор, затем начинает обработку forEachOrdered, создавая задачу для каждого элемента в потоке (для поддержания порядка используется сложный алгоритм на основе дерева, см. ForEachOps.ForEachOrderedTask: задачи создан, и из кода видно, что каждая задача ожидает выполнения другой задачи). Все задания представлены на ForkJoinPool. Я думаю, что тупик возникает, потому что первая задача обрабатывается рабочим потоком из ForkJoinPool, и этот поток ожидает в потоке main для создания лямбды. И поток main уже начал обрабатывать свою задачу и ожидает, пока другой рабочий поток завершит выполнение своей задачи. Отсюда тупик. Example4

В примере 4 мы создаем новый поток, который запускается асинхронно (то есть мы не ждем результата). Вот почему поток main не заблокирован, и теперь у него есть время для сборки Lambdas во время выполнения.

Заключение

Урок на вынос: если вы смешиваете статические инициализаторы, потоки и лямбда-выражения, вы должны действительно понимать, как реализованы эти концепции, в противном случае у вас могут возникнуть тупики.

...