Могу ли я разделить поток на несколько более мелких потоков - PullRequest
0 голосов
/ 05 мая 2020

Есть несколько вопросов для потоков, но для этого варианта использования & в java не нашел ни одного.

У меня огромный поток объектов Stream<A> [~ 1 миллион объектов]. StreamA исходит из файла.

Class A { enum status [Running,queued,Completed], String name }

Я хочу разделить Stream<A> на три потока без использования каких-либо инструкций Collect. Оператор Collect загружает все в память.

Я сталкиваюсь с StackOverflowException, поскольку я вызываю stream.concat несколько раз здесь.

Stream.Concat имеет проблему, упомянутую в Java Документы "Примечание по реализации: будьте осторожны при создании потоков из повторяющейся конкатенации. Accessing an element of a deeply concatenated stream can result in deep call chains, or even StackOverflowException."

Map<Status, Stream<String>> splitStream = new HashMap<>();
streamA.foreach(aObj -> 
Stream<String> statusBasedStream = splitStream.getOrDefault(aObj.status,Stream.of());
splitStream.put(aObj.status, Stream.concat(statusBasedStream, Stream.of(aObj.name))); 

Есть несколько вариантов, в которых пользовательские потоки доступны в github для достижения конкатенации, но при этом хотелось бы использовать стандартные библиотеки для решения это.

Если объем данных меньше, использовался бы подход со списком, как указано здесь ( Разделить поток на подпотоки с N элементами )

1 Ответ

0 голосов
/ 05 мая 2020

Не точное решение проблемы, но если у вас есть информация об индексах, тогда комбинация Stream.skip() и Stream.limit() может помочь в этом - Ниже показан фиктивный код, который я пробовал -

    int queuedNumbers = 100;
    int runningNumbers=200;
    Stream<Object> all = Stream.of();
    Stream<Object> queuedAndCompleted = all.skip(queuedNumbers);
    Stream<Object> queued = all.limit(queuedNumbers);
    Stream<Object> running = queuedAndCompleted.limit(runningNumbers);
    Stream<Object> completed = queuedAndCompleted.skip(runningNumbers);

Надеюсь, это поможет.

...