Подробности реализации потокового сплитератора - PullRequest
6 голосов
/ 02 апреля 2019

Глядя на исходный код WrappingSpliterator::trySplit, я был очень введен в заблуждение относительно его реализации:

    @Override
    public Spliterator<P_OUT> trySplit() {
        if (isParallel && buffer == null && !finished) {
            init();

            Spliterator<P_IN> split = spliterator.trySplit();
            return (split == null) ? null : wrap(split);
        }
        else
            return null;
    }

И если вам интересно, почему это важно, это потому, что, например, это:

Arrays.asList(1,2,3,4,5)
      .stream()
      .filter(x -> x != 1)
      .spliterator();

использует это.В моем понимании добавление любой промежуточной операции в поток вызовет запуск этого кода.

По сути, этот метод говорит, что если поток не является параллельным, обрабатывайте этот Spliterator как тот, который вообще не может быть разделен.И это важно для меня.В одном из моих методов (именно так я и получил этот код), я получаю Stream в качестве ввода и «разбираю» его на более мелкие части вручную, с помощью trySplit.Например, вы можете подумать, что я пытаюсь сделать findLast из Stream.

И именно здесь мое желание разделить на более мелкие куски обнуляется, потому что, как только я это сделаю:

Spliterator<T> sp = stream.spliterator();
Spliterator<T> prefixSplit = sp.trySplit();

, я обнаруживаю, что prefixSplit равно null, что означает, чтоЯ в принципе не могу ничего сделать, кроме как потреблять все sp с forEachRemaning.

И это немного странно, может быть , имеет смысл, когда присутствует filter;потому что в этом случае единственный способ (в моем понимании) Spliterator может быть возвращен с использованием некоторого вида buffer, может быть даже с предопределенным размером (очень похоже на Files::lines).Но почему это:

Arrays.asList(1,2,3,4)
      .stream()
      .sorted()
      .spliterator()
      .trySplit();

возвращает null - это то, что я не понимаю.sorted - это операция с состоянием, которая в любом случае буферизует элементы, фактически не уменьшая или не увеличивая их начальное число, поэтому, по крайней мере, теоретически это может вернуть что-то отличное от null ...

1 Ответ

1 голос
/ 09 мая 2019

Когда вы вызываете spliterator() для Stream, в текущей реализации есть только два возможных результата.

Если в потоке нет промежуточных операций, вы получите исходный сплитератор, который использовалсячтобы создать поток и возможность разделения которого полностью независима от параллельного состояния потока, так как на самом деле сплитератор ничего не знает о потоке.

В противном случае вы получите WrappingSpliterator, чтобудет инкапсулировать источник Spliterator и состояние конвейера, выраженное как PipelineHelper.Эта комбинация Spliterator и PipelineHelper не должна работать параллельно и, фактически, не будет работать в случае distinct(), поскольку WrappingSpliterator получит совершенно другую комбинацию, в зависимости от того, является ли Streamпараллельно или нет.

Для промежуточных операций без сохранения состояния это не имеет значения.Но, как обсуждалось в «, почему tryAdvance stream.spliterator () может накапливать элементы в буфере? », WrappingSpliterator является «универсальной реализацией», которая не учитываетфактический характер конвейера, поэтому его ограничения являются множеством возможных ограничений всех поддерживаемых этапов конвейера.Таким образом, существования одного сценария, который не сработал бы при игнорировании флага parallel, достаточно, чтобы запретить разбиение для всех конвейеров, если оно не равно parallel.

...