"Parallel.For" для Java? - PullRequest

"Parallel.For" для Java?

70 голосов
/ 25 октября 2010

Мне было интересно, есть ли Parallel.For , эквивалентный версии .net для Java?

Если есть кто-нибудь, пожалуйста, приведите пример?спасибо!

Ответы [ 11 ]

106 голосов
/ 25 октября 2010

Полагаю, самое близкое было бы:

ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);
try {
    for (final Object o : list) {
        exec.submit(new Runnable() {
            public void run() {
                // do stuff with o.
} finally {

Исходя из комментариев TheLQ, вы установили бы SUM_NUM_THREADS на Runtime.getRuntime().availableProcessors();

Редактировать: решено добавить базовую "Параллель". Для"реализация

public class Parallel {
    private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();

    private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For"));

    public static <T> void For(final Iterable<T> elements, final Operation<T> operation) {
        try {
            // invokeAll blocks for us until all submitted tasks in the call complete
            forPool.invokeAll(createCallables(elements, operation));
        } catch (InterruptedException e) {

    public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) {
        List<Callable<Void>> callables = new LinkedList<Callable<Void>>();
        for (final T elem : elements) {
            callables.add(new Callable<Void>() {
                public Void call() {
                    return null;

        return callables;

    public static interface Operation<T> {
        public void perform(T pParameter);

Пример использования Parallel.For

// Collection of items to process in parallel
Collection<Integer> elems = new LinkedList<Integer>();
for (int i = 0; i < 40; ++i) {
 // The operation to perform with each item
 new Parallel.Operation<Integer>() {
    public void perform(Integer param) {

Я думаю, что эта реализация действительно больше похожа на Parallel.ForEach

Редактировать Я размещаю это на GitHub, если кому-то интересно. Параллельно для на GitHub

10 голосов
/ 01 мая 2011

Решение MLaw является очень практичным Parallel.ForEach.Я добавил небольшую модификацию, чтобы сделать Parallel.For.

public class Parallel
static final int iCPU = Runtime.getRuntime().availableProcessors();

public static <T> void ForEach(Iterable <T> parameters,
                   final LoopBody<T> loopBody)
    ExecutorService executor = Executors.newFixedThreadPool(iCPU);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (final T param : parameters)
        Future<?> future = executor.submit(new Runnable()
            public void run() { loopBody.run(param); }


    for (Future<?> f : futures)
        try   { f.get(); }
        catch (InterruptedException e) { } 
        catch (ExecutionException   e) { }         


public static void For(int start,
                   int stop,
               final LoopBody<Integer> loopBody)
    ExecutorService executor = Executors.newFixedThreadPool(iCPU);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (int i=start; i<stop; i++)
        final Integer k = i;
        Future<?> future = executor.submit(new Runnable()
            public void run() { loopBody.run(k); }

    for (Future<?> f : futures)
        try   { f.get(); }
        catch (InterruptedException e) { } 
        catch (ExecutionException   e) { }         


public interface LoopBody <T>
    void run(T i);

public class ParallelTest
int k;  

public ParallelTest()
    k = 0;
    Parallel.For(0, 10, new LoopBody <Integer>()
        public void run(Integer i)
            k += i;
    System.out.println("Sum = "+ k);

public static void main(String [] argv)
    ParallelTest test = new ParallelTest();
8 голосов
/ 13 августа 2011

Построен по предложению mlaw, добавьте CountDownLatch.Добавьте chunksize, чтобы уменьшить submit ().

При тестировании с массивом из 4 миллионов элементов этот показатель в 5 раз быстрее по сравнению с () для моего процессора Core i7 2630QM.

public class Loop {
    public interface Each {
        void run(int i);

    private static final int CPUs = Runtime.getRuntime().availableProcessors();

    public static void withIndex(int start, int stop, final Each body) {
        int chunksize = (stop - start + CPUs - 1) / CPUs;
        int loops = (stop - start + chunksize - 1) / chunksize;
        ExecutorService executor = Executors.newFixedThreadPool(CPUs);
        final CountDownLatch latch = new CountDownLatch(loops);
        for (int i=start; i<stop;) {
            final int lo = i;
            i += chunksize;
            final int hi = (i<stop) ? i : stop;
            executor.submit(new Runnable() {
                public void run() {
                    for (int i=lo; i<hi; i++)
        try {
        } catch (InterruptedException e) {}

    public static void main(String [] argv) {
        Loop.withIndex(0, 9, new Loop.Each() {
            public void run(int i) {
5 голосов
/ 18 декабря 2013

Вот мой вклад в эту тему https://github.com/pablormier/parallel-loops. Использование очень простое:

Collection<String> upperCaseWords = 
    Parallel.ForEach(words, new Parallel.F<String, String>() {
        public String apply(String s) {
            return s.toUpperCase();

Также возможно изменить некоторые аспекты поведения, например количество потоков (по умолчанию используетсяпул кэшированных потоков):

Collection<String> upperCaseWords = 
            new Parallel.ForEach<String, String>(words)
                .apply(new Parallel.F<String, String>() {
                    public String apply(String s) {
                        return s.toUpperCase();

Весь код самодостаточен в одном классе Java и не имеет больше зависимостей, чем JDK.Я также призываю вас проверить новый способ распараллеливания в функциональном стиле с Java 8

5 голосов
/ 25 октября 2010

Платформа объединения вилок в Java 7 предназначена для поддержки параллелизма Но я не знаю точного эквивалента для Parallel.For.

4 голосов
/ 25 октября 2010

Более простой вариант будет

// A thread pool which runs for the life of the application.
private static final ExecutorService EXEC = 

EXEC.invokeAll(tasks); // you can optionally specify a timeout.
3 голосов
/ 28 июля 2014

Синхронизация часто убивает ускорение параллельных циклов for. Поэтому параллельным циклам for часто требуются свои личные данные и механизм сокращения, чтобы свести все потоки частных данных к единому результату.

Итак, я расширил Parallel.For версии Weimin Xiao с помощью механизма сокращения.

public class Parallel {
public static interface IntLoopBody {
    void run(int i);

public static interface LoopBody<T> {
    void run(T i);

public static interface RedDataCreator<T> {
    T run();

public static interface RedLoopBody<T> {
    void run(int i, T data);

public static interface Reducer<T> {
    void run(T returnData, T addData);

private static class ReductionData<T> {
    Future<?> future;
    T data;

static final int nCPU = Runtime.getRuntime().availableProcessors();

public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) {
    ExecutorService executor = Executors.newFixedThreadPool(nCPU);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (final T param : parameters) {
        futures.add(executor.submit(() -> loopBody.run(param) ));

    for (Future<?> f : futures) {
        try { 
        } catch (InterruptedException | ExecutionException e) { 

public static void For(int start, int stop, final IntLoopBody loopBody) {
    final int chunkSize = (stop - start + nCPU - 1)/nCPU;
    final int loops = (stop - start + chunkSize - 1)/chunkSize;
    ExecutorService executor = Executors.newFixedThreadPool(loops);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (int i=start; i < stop; ) {
        final int iStart = i;
        i += chunkSize;
        final int iStop = (i < stop) ? i : stop;

        futures.add(executor.submit(() -> {
            for (int j = iStart; j < iStop; j++) 

    for (Future<?> f : futures) {
        try { 
        } catch (InterruptedException | ExecutionException e) { 

public static <T> void For(int start, int stop, T result, final RedDataCreator<T> creator, final RedLoopBody<T> loopBody, final Reducer<T> reducer) {
    final int chunkSize = (stop - start + nCPU - 1)/nCPU;
    final int loops = (stop - start + chunkSize - 1)/chunkSize;
    ExecutorService executor = Executors.newFixedThreadPool(loops);
    List<ReductionData<T>> redData  = new LinkedList<ReductionData<T>>();

    for (int i = start; i < stop; ) {
        final int iStart = i;
        i += chunkSize;
        final int iStop = (i < stop) ? i : stop;
        final ReductionData<T> rd = new ReductionData<T>();

        rd.data = creator.run();
        rd.future = executor.submit(() -> {
            for (int j = iStart; j < iStop; j++) {
                loopBody.run(j, rd.data);

    for (ReductionData<T> rd : redData) {
        try { 
            if (rd.data != null) {
                reducer.run(result, rd.data);
        } catch (InterruptedException | ExecutionException e) { 

Вот простой пример теста: счетчик параллельных символов с использованием несинхронизированной карты.

import java.util.*;

public class ParallelTest {
static class Counter {
    int cnt;

    Counter() {
        cnt = 1;

public static void main(String[] args) {
    String text = "More formally, if this map contains a mapping from a key k to a " + 
            "value v such that key compares equal to k according to the map's ordering, then " +
            "this method returns v; otherwise it returns null.";
    Map<Character, Counter> charCounter1 = new TreeMap<Character, Counter>();
    Map<Character, Counter> charCounter2 = new TreeMap<Character, Counter>();

    // first sequentially
    for(int i=0; i < text.length(); i++) {
        char c = text.charAt(i);
        Counter cnt = charCounter1.get(c);
        if (cnt == null) {
            charCounter1.put(c, new Counter());
        } else {
    for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
        System.out.println(entry.getKey() + ": " + entry.getValue().cnt);

    // now parallel without synchronization
    Parallel.For(0, text.length(), charCounter2,
        // Creator
        () -> new TreeMap<Character, Counter>(), 
        // Loop Body
        (i, map) -> {
            char c = text.charAt(i);
            Counter cnt = map.get(c);
            if (cnt == null) {
                map.put(c, new Counter());
            } else {
        // Reducer
        (result, map) -> {
            for(Map.Entry<Character, Counter> entry: map.entrySet()) {
                Counter cntR = result.get(entry.getKey());
                if (cntR == null) {
                    result.put(entry.getKey(), entry.getValue());
                } else {
                    cntR.cnt += entry.getValue().cnt;

    // compare results
    assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size();
    Iterator<Map.Entry<Character, Counter>> it2 = charCounter2.entrySet().iterator();
    for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
        Map.Entry<Character, Counter> entry2 = it2.next();
        assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content";

    System.out.println("Well done!");
3 голосов
/ 17 мая 2011

Существует эквивалент для Parallel.For, доступный как расширение Java. Он называется Ateji PX, у них есть бесплатная версия, с которой можно играть. http://www.ateji.com/px/index.html

Это точный эквивалент параллеля .for и похож на.

For ||

Больше примеров и объяснений в википедии: http://en.wikipedia.org/wiki/Ateji_PX

Закрытая вещь в Java IMO

1 голос
/ 10 июня 2019

Я обнаружил, что ForkJoinPool и IntStream очень полезны в моем случае (Parallel For с ограниченным числом потоков).

C #:

static void MathParallel(int threads)
            Parallel.For(1, partitions, new ParallelOptions { MaxDegreeOfParallelism = threads }, (i) => {
                partitionScores[i] = Math.Sin(3*i);

и эквивалент Java:

static void mathParallel(int threads) {
        ForkJoinPool pool = new ForkJoinPool(threads);
            pool.submit(()-> IntStream.range(0, partitions).parallel().forEach(i -> {
                partitionScores[i] = Math.sin(3*i);
        while (!pool.isTerminated()){
1 голос
/ 14 марта 2012

У меня есть обновленный класс Java Parallel, который может выполнять Parallel.For, Parallel.ForEach, Parallel.Tasks и секционированный параллельный цикл. Исходный код выглядит следующим образом:

Примеры использования этих параллельных циклов следующие:

public static void main(String [] argv)
    //sample data
    final ArrayList<String> ss = new ArrayList<String>();

    String [] s = {"a", "b", "c", "d", "e", "f", "g"};
    for (String z : s) ss.add(z);
    int m = ss.size();

    //parallel-for loop
    System.out.println("Parallel.For loop:");
    Parallel.For(0, m, new LoopBody<Integer>()
        public void run(Integer i)
           System.out.println(i +"\t"+ ss.get(i));   

   //parallel for-each loop
   System.out.println("Parallel.ForEach loop:");
   Parallel.ForEach(ss, new LoopBody<String>()
       public void run(String p)

   //partitioned parallel loop
   System.out.println("Partitioned Parallel loop:");
   Parallel.ForEach(Parallel.create(0, m), new LoopBody<Partition>()
       public void run(Partition p)
           for(int i=p.start; i<p.end; i++)
               System.out.println(i +"\t"+ ss.get(i));

   //parallel tasks
   System.out.println("Parallel Tasks:");
   Parallel.Tasks(new Task []
       new Task() {public void run()
           for(int i=0; i<3; i++)
               System.out.println(i +"\t"+ ss.get(i));

       new Task() {public void run()
           for (int i=3; i<6; i++)
               System.out.println(i +"\t"+ ss.get(i));
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.