Какой самый простой способ распараллелить задачу в Java? - PullRequest
36 голосов
/ 06 января 2010

Скажем, у меня есть задача вроде:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

Какой самый простой способ распараллелить каждый compute () (при условии, что они уже распараллеливаются)?

Мне не нужен ответ, который строго соответствует приведенному выше коду, просто общий ответ. Но если вам нужна дополнительная информация: мои задачи связаны с IO, и это для веб-приложения Spring, и задачи будут выполняться в HTTP-запросе.

Ответы [ 8 ]

56 голосов
/ 06 января 2010

Я бы рекомендовал взглянуть на ExecutorService .

В частности, что-то вроде этого:

ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
    Callable<Result> c = new Callable<Result>() {
        @Override
        public Result call() throws Exception {
            return compute(object);
        }
    };
    tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);

Обратите внимание, что использование newCachedThreadPool может быть плохим, если objects большой список. Кэшированный пул потоков может создать поток для каждой задачи! Возможно, вы захотите использовать newFixedThreadPool(n), где n - что-то разумное (например, количество ядер, которое у вас есть, при условии, что compute() привязано к процессору).

Вот полный код, который на самом деле выполняется:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceExample {
    private static final Random PRNG = new Random();

    private static class Result {
        private final int wait;
        public Result(int code) {
            this.wait = code;
        }
    }

    public static Result compute(Object obj) throws InterruptedException {
        int wait = PRNG.nextInt(3000);
        Thread.sleep(wait);
        return new Result(wait);
    }

    public static void main(String[] args) throws InterruptedException,
        ExecutionException {
        List<Object> objects = new ArrayList<Object>();
        for (int i = 0; i < 100; i++) {
            objects.add(new Object());
        }

        List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
        for (final Object object : objects) {
            Callable<Result> c = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return compute(object);
                }
            };
            tasks.add(c);
        }

        ExecutorService exec = Executors.newCachedThreadPool();
        // some other exectuors you could try to see the different behaviours
        // ExecutorService exec = Executors.newFixedThreadPool(3);
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            long start = System.currentTimeMillis();
            List<Future<Result>> results = exec.invokeAll(tasks);
            int sum = 0;
            for (Future<Result> fr : results) {
                sum += fr.get().wait;
                System.out.println(String.format("Task waited %d ms",
                    fr.get().wait));
            }
            long elapsed = System.currentTimeMillis() - start;
            System.out.println(String.format("Elapsed time: %d ms", elapsed));
            System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
        } finally {
            exec.shutdown();
        }
    }
}
2 голосов
/ 06 января 2010

Для более подробного ответа прочитайте Параллелизм Java на практике и используйте java.util.concurrent .

1 голос
/ 06 января 2010

Вот что я использую в своих проектах:

public class ParallelTasks
{
    private final Collection<Runnable> tasks = new ArrayList<Runnable>();

    public ParallelTasks()
    {
    }

    public void add(final Runnable task)
    {
        tasks.add(task);
    }

    public void go() throws InterruptedException
    {
        final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
                .availableProcessors());
        try
        {
            final CountDownLatch latch = new CountDownLatch(tasks.size());
            for (final Runnable task : tasks)
                threads.execute(new Runnable() {
                    public void run()
                    {
                        try
                        {
                            task.run();
                        }
                        finally
                        {
                            latch.countDown();
                        }
                    }
                });
            latch.await();
        }
        finally
        {
            threads.shutdown();
        }
    }
}

// ...

public static void main(final String[] args) throws Exception
{
    ParallelTasks tasks = new ParallelTasks();
    final Runnable waitOneSecond = new Runnable() {
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
            }
        }
    };
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    final long start = System.currentTimeMillis();
    tasks.go();
    System.err.println(System.currentTimeMillis() - start);
}

Который печатает чуть больше 2000 на моей двухъядерной коробке.

0 голосов
/ 27 июня 2018

В Java8 и более поздних версиях вы можете создать поток, а затем выполнить обработку параллельно с parallelStream :

List<T> objects = ...;

List<Result> result = objects.parallelStream().map(object -> {
            return compute(object);
        }).collect(Collectors.toList());

Примечание: порядок результатов может не соответствовать порядку объектов в списке.

Подробные сведения о том, как настроить правильное количество потоков, доступны в этом вопросе о стековом потоке Сколько потоков создано в параллельном потоке в java-8

0 голосов
/ 06 января 2010

Fork / Join параллельный массив является одним из вариантов

0 голосов
/ 06 января 2010

Я собирался упомянуть класс исполнителя. Вот пример кода, который вы бы поместили в класс executor.

    private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);

    private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();

    public void addCallable(Callable<Object> callable) {
        this.callableList.add(callable);
    }

    public void clearCallables(){
        this.callableList.clear();
    }

    public void executeThreads(){
        try {
        threadLauncher.invokeAll(this.callableList);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public Object[] getResult() {

        List<Future<Object>> resultList = null;
        Object[] resultArray = null;
        try {

            resultList = threadLauncher.invokeAll(this.callableList);

            resultArray = new Object[resultList.size()];

            for (int i = 0; i < resultList.size(); i++) {
                resultArray[i] = resultList.get(i).get();
            }

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return resultArray;
    }

Затем, чтобы использовать его, вы должны вызывать класс executor для его заполнения и выполнения.

executor.addCallable( some implementation of callable) // do this once for each task 
Object[] results = executor.getResult();
0 голосов
/ 06 января 2010

Можно просто создать несколько потоков и получить результат.

Thread t = new Mythread(object);

if (t.done()) {
   // get result
   // add result
}

РЕДАКТИРОВАТЬ: я думаю, что другие решения круче.

0 голосов
/ 06 января 2010

Вы можете использовать ThreadPoolExecutor . Вот пример кода: http://programmingexamples.wikidot.com/threadpoolexecutor (слишком долго, чтобы привести его сюда)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...