Проблема в строительстве Java Lazy Stream - PullRequest
3 голосов
/ 08 октября 2019

Я просматривал эту ссылку для создания отложенного потока и пытался использовать ее для одного из моих случаев.

В моем основном потоке есть некоторые операции, которые необходимо выполнить на Stream.onClose () .

В моей собственной логике я использую Iterator из Stream.iterator () для обработки потока.

Это работало нормально, потребляяфактический поток. Но когда я использовал Stream.flatMap () для создания отложенного потока, функция onClose вызывается, когда я начинаю итерацию, что, в свою очередь, создает проблемы для меня.

Я пыталсяэто в zulu-opendjk 1.8.0_222 и 13. Я сталкиваюсь с этим исключением в обеих средах.

enter image description here

Вы можете воспроизвести проблему с помощью следующихкод.

import java.util.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class TestStreamIterator
{
    public static void main(String args[])
    {
        Stream<String> stream1 = getStream();
        stream1.iterator().forEachRemaining(System.out::println);

        Stream<String> stream2 = Stream.of(1).flatMap(integer -> getStream());
        stream2.iterator().forEachRemaining(System.out::println);
    }

    private static Stream<String> getStream()
    {
        List<String> values = Arrays.asList("a", "b", "c");

        MyIterator iterator = new MyIterator(values);
        Stream<String> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL | Spliterator.IMMUTABLE), false).onClose(iterator::close);

        return stream;
    }

    private static class MyIterator implements Iterator<String>, AutoCloseable
    {
        private Iterator<String> iterator;

        public MyIterator(List<String> values)
        {
            iterator = values.iterator();
        }

        @Override
        public boolean hasNext()
        {
            return iterator.hasNext();
        }

        @Override
        public String next()
        {
            return iterator.next();
        }

        @Override
        public void close()
        {
            throw new IllegalStateException("Should not come here");
        }
    }
}

Мое занижение при использовании flatMap;close должен вызываться только метод Stream.of(1). Не для потока, созданного внутри функции flatMap.

Я ожидал, что функция onClose будет вызываться только при закрытии потока. Но я не уверен, где поток закрывается.

Любая помощь в решении этого дела также будет полезна.

1 Ответ

4 голосов
/ 08 октября 2019

Когда вы звоните flatMap(integer -> getStream()) здесь:

Stream<String> stream2 = Stream.of(1).flatMap(integer -> getStream());
stream2.iterator().forEachRemaining(System.out::println);

Вы вызываете этот метод:

Итератор

<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);

Возвращает поток, состоящий из результатов замены каждого элемента этого потока содержимым отображенного потока, созданного с использованием предоставленной функции отображенияк каждому элементу. Каждый сопоставленный поток закрывается после помещения его содержимого в этот поток . (Если сопоставленный поток равен нулю, вместо него используется пустой поток.) ​​

Таким образом, как сказано в документации, сопоставленный поток вы передаете в этот метод (из getStream(), который являетсяПоток через MyIterator) будет закрыт, и он (как определено в onClose этого потока) вызывает MyIterator.close(), что вызывает исключение.


Обращаясь к вашему комментарию свы, кажется, не следите:

Stream<String> stream2 = Stream.of(1).flatMap(integer -> getStream());

Создает поток, который будет лениво сопоставляться с содержимым подпотока, когда вы его читаете. Когда этот подпоток загружен в основной поток, подпоток будет закрыт.

stream2.iterator().forEachRemaining(System.out::println);

Вы читаете из основного потока, который сопоставляется с подпотоком, который читает весь подпотокзатем закрывает подпоток, который затем вызывает Stream.onClose(), который вызывает MyIterator.close()

...