Java Spliterator: Как обрабатывать большие разделения потока одинаково? - PullRequest
3 голосов
/ 07 июля 2019

Код, с которым я работаю

package com.skimmer;

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.LongStream;
import java.util.stream.Stream;

public class App {

  public static void main(String[] args) throws InterruptedException, ExecutionException {

    // Simply creating some 'test' data
    Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");

    Spliterator<String> spliterator = test.parallel().spliterator();
    List<Callable<Long>> callableList = new ArrayList<Callable<Long>>();

    // Creating a future for each split to process concurrently
    int totalSplits = 0;
    while ((spliterator = spliterator.trySplit()) != null) {

      totalSplits++;
      callableList.add(new Worker(spliterator, "future-" + totalSplits));
    }

    ExecutorService executor = Executors.newFixedThreadPool(totalSplits);
    List<Future<Long>> futures = executor.invokeAll(callableList);
    AtomicLong counter = new AtomicLong(0);

    for (Future<Long> future : futures)
      counter.getAndAdd(future.get());

    System.out.println("Total processed " + counter.get());
    System.out.println("Total splits " + totalSplits);

    executor.shutdown();
  }

  public static class Worker implements Callable<Long> {

    private Spliterator<String> spliterator;
    private String name;

    public Worker(Spliterator<String> spliterator, String name) {
      this.spliterator = spliterator;
      this.name = name;
    }

    @Override
    public Long call() {

      AtomicLong counter = new AtomicLong(0);
      spliterator.forEachRemaining(s -> {

        // We'll assume busy processing code here
        counter.getAndIncrement();

      });

      System.out.println(name + " Total processed : " + counter.get());

      return counter.get();
    }
  }
}

Выход

furture-11 Total processed : 244
furture-10 Total processed : 488
furture-9 Total processed : 977
furture-12 Total processed : 122
furture-7 Total processed : 3906
furture-13 Total processed : 61
furture-8 Total processed : 1953
furture-6 Total processed : 7813
furture-14 Total processed : 31
furture-5 Total processed : 15625
furture-15 Total processed : 15
furture-4 Total processed : 31250
furture-17 Total processed : 4
furture-18 Total processed : 2
furture-19 Total processed : 1
furture-16 Total processed : 8
furture-3 Total processed : 62500
furture-2 Total processed : 125000
furture-1 Total processed : 250000
future-0 Total processed : 500000
Total processed 1000000
Total splits 20

Моя проблема / Вопрос: Первый trySplit (и будущая задача 'future-0') получает ровно n / 2 элементов для начала обработки. Первые расщепления пары занимают много времени - это ухудшается с ростом n. Есть ли другой способ обработки потока, где каждое будущее / вызываемое получает равное распределение элементов для обработки, например (N / split) т.е. 1000000/20 = 50000

Желаемые результаты

furture-11 Total processed : 50000
furture-10 Total processed : 50000
furture-9 Total processed : 50000
furture-12 Total processed : 50000
furture-7 Total processed : 50000
furture-13 Total processed : 50000
furture-8 Total processed : 50000
furture-6 Total processed : 50000
furture-14 Total processed : 50000
furture-5 Total processed : 50000
furture-15 Total processed : 50000
furture-4 Total processed : 50000
furture-17 Total processed : 50000
furture-18 Total processed : 50000
furture-19 Total processed : 50000
furture-16 Total processed : 50000
furture-3 Total processed : 50000
furture-2 Total processed : 50000
furture-1 Total processed : 50000
future-0 Total processed : 50000
Total processed 1000000
Total splits 20

Последующий вопрос: Если Spliterator не может сделать это, какой другой подход / решение лучше всего использовать для одновременной обработки больших потоков.

Практический сценарий: Обработка большого (6 ГБ) файла CSV, который слишком велик для хранения в памяти

1 Ответ

4 голосов
/ 08 июля 2019

Вы получаете идеально сбалансированные сплиты здесь.Проблема заключается в том, что каждый раз, когда вы разбиваете последовательность элементов на две половины, представленные двумя экземплярами Spliterator, вы создаете задание для одной из половин, даже не пытаясь разделить ее дальше, а только подразделив другую половину.

Итак, сразу после первого разделения вы создаете задание, охватывающее 500 000 элементов.Затем вы вызываете trySplit для остальных 500 000 элементов, получая идеальное разбиение на два фрагмента по 250 000 элементов, создаете еще одну работу, охватывающую один блок из 250 000 элементов, и пытаетесь только разделить другие.И так далее.Это ваш код, создающий несбалансированные задания.

Когда вы изменяете свою первую часть на

// Simply creating some 'test' data
Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");
// Creating a future for each split to process concurrently
List<Callable<Long>> callableList = new ArrayList<>();
int workChunkTarget = 5000;
Deque<Spliterator<String>> spliterators = new ArrayDeque<>();
spliterators.add(test.parallel().spliterator());
int totalSplits = 0;
while(!spliterators.isEmpty()) {
    Spliterator<String> spliterator = spliterators.pop();
    Spliterator<String> prefix;
    while(spliterator.estimateSize() > workChunkTarget
              && (prefix = spliterator.trySplit()) != null) {
        spliterators.push(spliterator);
        spliterator = prefix;
    }
    totalSplits++;
    callableList.add(new Worker(spliterator, "future-" + totalSplits));
}

, вы тихо приближаетесь к желаемому целевому размеру рабочей нагрузки (настолько близко, насколько мы можем, учитывая, что числане являются степенью двойки).

Дизайн Spliterator работает намного более плавно с такими инструментами, как ForkJoinTask, где новое задание может быть отправлено после каждого успешного trySplit, и само задание решит разделить ипорождает новые задания одновременно, когда рабочие потоки не насыщены (например, параллельные потоковые операции выполняются в эталонной реализации).

...