Выполнять потоковые операции в параллельном потоке до тех пор, пока предыдущая потоковая операция не обработает все элементы - PullRequest
3 голосов
/ 19 января 2020

Мне интересно, ожидают ли потоковые операции в потоковом конвейере параллельного потока завершения предыдущей потоковой операции для завершения обработки всех элементов потока.

Если у меня есть следующий потоковый конвейер:

List <String> parsedNumbers = IntStream.range(1, 6)
    .parallel()
    .map(String::valueOf)
    .map(integerAsString => {
        System.out.println("First print statement: " + integerAsString);
        return integerAsString;
    })
    .map(integerAsString => {
        System.out.println("Second print statement: " + integerAsString);
        return integerAsString;
    })
    .collect(Collectors.toList());

Может ли случиться так, что System.out.println("First print statement: " + integerAsString) уже вызывается для элемента X, но операция String::parseInt все еще выполняется для другого элемента Y в потоке?

Может выводить этот код следующим образом:

Первый оператор печати: 1
Первый оператор печати: 2
Первый оператор печати : 3
Второй оператор печати: 1
Второй оператор печати: 2
Первый оператор печати: 4
Второй оператор печати: 3
Второй оператор печати: 4
Первый оператор печати: 5
Второе выражение для печати: 5

Воля всегда будет выглядеть так:

Первое выражение для печати: 1
Первая печать оператор: 2
первый оператор печати: 3
первый оператор печати: 4
первый оператор печати: 5
второй оператор печати: 1
второй оператор печати: 2
второй оператор печати: 3
Второй оператор печати: 4
Второй оператор печати: 5

Ответы [ 2 ]

5 голосов
/ 19 января 2020

Да, может. Intermediate этапы могут выполняться в любом порядке, terminal операции имеют определенный порядок, если источник для потока имеет порядок (в отличие от Set, например) и , а сам поток не имеет изменить этот порядок (вызывая unordered - хотя в данный момент это мало что делает).

То есть: вы не знаете, какой элемент будет проходить через один из этапов в данный момент времени , нет порядка того, как элементы обрабатываются для параллельного потока.

Большой вопрос - почему вас волнует? Предполагается, что промежуточные операции свободны от побочных эффектов, и полагаться на любой заказ - плохая идея.

4 голосов
/ 20 января 2020

Порядок обработки не гарантируется даже для последовательных потоков. Только конечный результат будет согласован с порядком встречи, если данные были таковыми.

При запуске следующего последовательного кода

List<String> parsedNumbers = IntStream.range(1, 6)
    .mapToObj(String::valueOf)
    .map(integerAsString -> {
        System.out.println("First print statement: " + integerAsString);
        return integerAsString;
    })
    .map(integerAsString -> {
        System.out.println("Second print statement: " + integerAsString);
        return integerAsString;
    })
    .collect(Collectors.toList());

будет напечатано

First print statement: 1
Second print statement: 1
First print statement: 2
Second print statement: 2
First print statement: 3
Second print statement: 3
First print statement: 4
Second print statement: 4
First print statement: 5
Second print statement: 5

показывает, что потоки не работают так, как вы ожидаете. Эталонная реализация имеет явное предпочтение в отношении прохождения каждого элемента через весь поток перед обработкой следующего. Когда вы включаете параллельную обработку, то же ядро ​​обработки c будет выполняться для каждого ядра ЦП.

Поэтому, когда я использую

List<String> parsedNumbers = IntStream.range(1, 6)
    .parallel()
    .mapToObj(String::valueOf)
    .map(integerAsString -> {
        System.out.println("First print statement: " + integerAsString);
        return integerAsString;
    })
    .map(integerAsString -> {
        System.out.println("Second print statement: " + integerAsString);
        return integerAsString;
    })
    .collect(Collectors.toList());

, я получаю что-то подобное на моей машине:

First print statement: 5
First print statement: 2
First print statement: 1
First print statement: 4
First print statement: 3
Second print statement: 5
Second print statement: 2
Second print statement: 1
Second print statement: 4
Second print statement: 3

, который может выглядеть как обработанный первый оператор печати как этап перед вторым, но это всего лишь совпадение того, что у него больше ядер ЦП, чем у потоковых элементов, и удачное время. Например, когда я изменяю range(1, 6) на range(1, 18), я получаю что-то вроде

First print statement: 6
First print statement: 10
First print statement: 9
First print statement: 3
First print statement: 15
First print statement: 5
Second print statement: 9
First print statement: 11
First print statement: 8
Second print statement: 3
Second print statement: 11
Second print statement: 5
Second print statement: 10
Second print statement: 6
First print statement: 7
First print statement: 12
Second print statement: 8
Second print statement: 15
Second print statement: 12
Second print statement: 7
First print statement: 2
First print statement: 17
First print statement: 14
First print statement: 4
Second print statement: 14
Second print statement: 17
Second print statement: 2
First print statement: 1
First print statement: 16
First print statement: 13
Second print statement: 16
Second print statement: 1
Second print statement: 4
Second print statement: 13

Мало того, что нет никаких гарантий относительно порядка обработки, также нет никаких гарантий относительно того, какие элементы будут обрабатываться Например,

IntStream.range(1, 30)
    .filter(i -> i%13 == 1)
    .peek(i -> System.out.println("processing "+i))
    .parallel()
    .findFirst()
    .ifPresent(i -> System.out.println("result is "+i));

производит в моей настройке

processing 14
processing 1
processing 27
result is 1

Таким образом, хотя результат гарантированно будет 1, первый соответствующий элемент в порядке встречи, нет никакой гарантии, что другие элементы, следующие за ним в порядке встречи, не обрабатываются.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...