Вы получаете идеально сбалансированные сплиты здесь.Проблема заключается в том, что каждый раз, когда вы разбиваете последовательность элементов на две половины, представленные двумя экземплярами Spliterator
, вы создаете задание для одной из половин, даже не пытаясь разделить ее дальше, а только подразделив другую половину.
Итак, сразу после первого разделения вы создаете задание, охватывающее 500 000 элементов.Затем вы вызываете trySplit
для остальных 500 000 элементов, получая идеальное разбиение на два фрагмента по 250 000 элементов, создаете еще одну работу, охватывающую один блок из 250 000 элементов, и пытаетесь только разделить другие.И так далее.Это ваш код, создающий несбалансированные задания.
Когда вы изменяете свою первую часть на
// Simply creating some 'test' data
Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");
// Creating a future for each split to process concurrently
List<Callable<Long>> callableList = new ArrayList<>();
int workChunkTarget = 5000;
Deque<Spliterator<String>> spliterators = new ArrayDeque<>();
spliterators.add(test.parallel().spliterator());
int totalSplits = 0;
while(!spliterators.isEmpty()) {
Spliterator<String> spliterator = spliterators.pop();
Spliterator<String> prefix;
while(spliterator.estimateSize() > workChunkTarget
&& (prefix = spliterator.trySplit()) != null) {
spliterators.push(spliterator);
spliterator = prefix;
}
totalSplits++;
callableList.add(new Worker(spliterator, "future-" + totalSplits));
}
, вы тихо приближаетесь к желаемому целевому размеру рабочей нагрузки (настолько близко, насколько мы можем, учитывая, что числане являются степенью двойки).
Дизайн Spliterator
работает намного более плавно с такими инструментами, как ForkJoinTask
, где новое задание может быть отправлено после каждого успешного trySplit
, и само задание решит разделить ипорождает новые задания одновременно, когда рабочие потоки не насыщены (например, параллельные потоковые операции выполняются в эталонной реализации).