Внутренние изменения для лимита и неупорядоченного потока - PullRequest
0 голосов
/ 27 апреля 2018

В основном это произошло при попытке ответить на другой вопрос. Предположим, этот код:

AtomicInteger i = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
IntStream.generate(() -> i.incrementAndGet())
        .parallel()
        .peek(x -> count.incrementAndGet())
        .limit(5)
        .forEach(System.out::println);

System.out.println("count = " + count);

Я понимаю тот факт, что IntStream#generate является неупорядоченным бесконечным потоком , и для его завершения должна быть операция короткого замыкания (limit в данном случае). Я также понимаю, что Supplier можно вызывать столько раз, сколько ощущается реализацией Stream, до достижения этого предела.

Выполнение этого в java-8 вывело бы count всегда 512 (может быть не всегда, но так на моей машине).

На контрасте, работающем под java-10, редко превышает 5. Поэтому мой вопрос заключается в том, что изменилось внутренне, так что короткое замыкание происходит намного лучше (я пытаюсь ответить на него самостоятельно, имея источники и пытаясь сделать некоторые различия ...)

1 Ответ

0 голосов
/ 27 апреля 2018

Изменение произошло где-то между Java 9, бета 103 и Java 9, бета 120 ( JDK ‑ 8154387 ).

Ответственный класс - StreamSpliterators.UnorderedSliceSpliterator.OfInt, соотв. его супер класс StreamSpliterators.UnorderedSliceSpliterator.

Старая версия класса выглядела как

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
    static final int CHUNK_SIZE = 1 << 7;

    // The spliterator to slice
    protected final T_SPLITR s;
    protected final boolean unlimited;
    private final long skipThreshold;
    private final AtomicLong permits;

    UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
        this.s = s;
        this.unlimited = limit < 0;
        this.skipThreshold = limit >= 0 ? limit : 0;
        this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
    }

    UnorderedSliceSpliterator(T_SPLITR s,
                              UnorderedSliceSpliterator<T, T_SPLITR> parent) {
        this.s = s;
        this.unlimited = parent.unlimited;
        this.permits = parent.permits;
        this.skipThreshold = parent.skipThreshold;
    }

...

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Objects.requireNonNull(action);

            ArrayBuffer.OfRef<T> sb = null;
            PermitStatus permitStatus;
            while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                if (permitStatus == PermitStatus.MAYBE_MORE) {
                    // Optimistically traverse elements up to a threshold of CHUNK_SIZE
                    if (sb == null)
                        sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
                    else
                        sb.reset();
                    long permitsRequested = 0;
                    do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
                    if (permitsRequested == 0)
                        return;
                    sb.forEach(action, acquirePermits(permitsRequested));
                }
                else {
                    // Must be UNLIMITED; let 'er rip
                    s.forEachRemaining(action);
                    return;
                }
            }
        }

Как мы видим, он пытается буферизовать до CHUNK_SIZE = 1 << 7 элементов в каждом сплитераторе, что может привести к «количеству ядер ЦП» × 128 элементов.

Напротив, новая версия выглядит как

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
    static final int CHUNK_SIZE = 1 << 7;

    // The spliterator to slice
    protected final T_SPLITR s;
    protected final boolean unlimited;
    protected final int chunkSize;
    private final long skipThreshold;
    private final AtomicLong permits;

    UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
        this.s = s;
        this.unlimited = limit < 0;
        this.skipThreshold = limit >= 0 ? limit : 0;
        this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
            ((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;
        this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
    }

    UnorderedSliceSpliterator(T_SPLITR s,
                              UnorderedSliceSpliterator<T, T_SPLITR> parent) {
        this.s = s;
        this.unlimited = parent.unlimited;
        this.permits = parent.permits;
        this.skipThreshold = parent.skipThreshold;
        this.chunkSize = parent.chunkSize;
    }

...

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Objects.requireNonNull(action);

            ArrayBuffer.OfRef<T> sb = null;
            PermitStatus permitStatus;
            while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                if (permitStatus == PermitStatus.MAYBE_MORE) {
                    // Optimistically traverse elements up to a threshold of chunkSize
                    if (sb == null)
                        sb = new ArrayBuffer.OfRef<>(chunkSize);
                    else
                        sb.reset();
                    long permitsRequested = 0;
                    do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
                    if (permitsRequested == 0)
                        return;
                    sb.forEach(action, acquirePermits(permitsRequested));
                }
                else {
                    // Must be UNLIMITED; let 'er rip
                    s.forEachRemaining(action);
                    return;
                }
            }
        }

Итак, теперь есть поле экземпляра chunkSize. Когда есть определенный предел, и выражение ((skip + limit) / AbstractTask.LEAF_TARGET) + 1 оценивается как меньшее значение, чем CHUNK_SIZE, будет использоваться это меньшее значение. Таким образом, при небольших ограничениях chunkSize будет намного меньше. В вашем случае с пределом 5 размер чанка всегда будет 1.

...