Java 8 параллельный поток списка параметров param - PullRequest
2 голосов
/ 11 марта 2019

У меня есть метод:

invokList(List<Object> list);

Этот метод находится внутри фляги, и у меня нет доступа к его исходному коду.Поэтому для этого мне нужно выполнить invokList параллельно, кто-то может помочь в этом?

Идея состоит в том, чтобы разбить список на несколько списков и выполнить invokList параллельно.

Iсделали этот пример:

            import java.util.Arrays;
            import java.util.Collections;
            import java.util.List;

            public class Test {

                public static void main(String[] args) {
                    List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
                    list.parallelStream()
                            .map(Collections::singletonList)
                            .forEach(Test::invokList);
                }

                public static void invokList(List<Integer> list) {
                    try {
                        Thread.sleep(100);
                        System.out.println("The Thread :" + Thread.currentThread().getName() + " is processing this list" + list);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

Ответы [ 3 ]

5 голосов
/ 11 марта 2019

У Guava есть методы Lists.partition и Iterables.partition , которые делают что-то наподобие того, что вы просите.Скажем, у вас есть большой список и вы хотите обработать его кусками по 5, вы можете сделать:

int batchSize = 5;
Lists.partition(list, batchSize)
   .parallelStream()
   .forEach(batch -> invokeList(batch));
1 голос
/ 12 марта 2019

если вы не хотите вносить дополнительные зависимости, такие как Guava, вы можете написать сборщик, который разделит ваш список на куски:

static <T> Collector<T, List<List<T>>, List<List<T>>> toChunks(int size) {
    return Collector.of(ArrayList::new, (list, value) -> {
        List<T> chunk = list.isEmpty() ? null : list.get(list.size() - 1);
        if (chunk == null || chunk.size() == size) {
            chunk = new ArrayList<>(size);
            list.add(chunk);
        }
        chunk.add(value);
    }, (list1, list2) -> {
        throw new UnsupportedOperationException();
    });
}

и затем назовите его следующим образом:

 List<Integer> list = Arrays.asList(1,26,17,18,19,20);
 list.stream().collect(toChunks(5))
              .parallelStream()
              .forEach(System.out::println);
1 голос
/ 12 марта 2019

Выглядит очень многословно, но вы можете попробовать следующее.Метод runAsync() заставит списки работать параллельно.

private void test(List<Object> list, int chunkSize) throws ExecutionException, InterruptedException {
    AtomicInteger prev = new AtomicInteger(0);
    List<CompletableFuture> futures = new ArrayList<>();
    IntStream.range(1, (int) (chunkSize * (Math.ceil(Math.abs(list.size() / (double) chunkSize)))))
            .filter(i -> i % chunkSize == 0 || i == list.size())
            .forEach(i -> {
                List<Object> chunk = list.subList(prev.get(), i);
                futures.add(CompletableFuture.runAsync(() -> invokeList(chunk)));
                prev.set(i);
            });
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
}

private void invokeList(List<Object> list) {
    System.out.println("Invoked for: " + list);
}

Я запустил его для списка из 30 целых чисел с размером фрагмента 5 вот так:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    List<Object> list = IntStream.range(0, 30).mapToObj(i1 -> (Object) String.valueOf(i1)).collect(Collectors.toList());
    int chunkSize = 5;
    new Test().test(list, chunkSize);
}

Вывод :

Invoked for: [15, 16, 17, 18, 19]
Invoked for: [0, 1, 2, 3, 4]
Invoked for: [5, 6, 7, 8, 9]
Invoked for: [10, 11, 12, 13, 14]
Invoked for: [20, 21, 22, 23, 24]
...