Java: подождите в цикле, пока задачи ThreadPoolExecutor не будут выполнены, прежде чем продолжить - PullRequest
1 голос
/ 10 июля 2019

Я работаю над параллельным алгоритмом Дейкстры. Для каждого узла сделаны потоки для просмотра всех ребер текущего узла. Это было сделано параллельно с потоками, но слишком много накладных расходов. Это привело к более длительному времени, чем последовательная версия алгоритма.

ThreadPool был добавлен для решения этой проблемы, но у меня возникли проблемы с ожиданием выполнения задач, прежде чем я смогу перейти к следующей итерации. Только после того, как все задачи для одного узла выполнены, мы должны двигаться дальше. Нам нужны результаты всех задач, прежде чем я смогу найти ближайший по узлу узел.

Я попытался выполнить executor.shutdown (), но с этим подходом он не будет принимать новые задачи. Как мы можем ждать в цикле, пока каждая задача не будет завершена без необходимости каждый раз объявлять ThreadPoolExecutor. Делая это, вы избавитесь от цели меньших накладных расходов, используя это вместо обычных потоков.

Одна вещь, о которой я подумал, это BlockingQueue, который добавляет задачи (ребра). Но и для этого решения я застрял в ожидании завершения задач без shudown ().

public void apply(int numberOfThreads) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numberOfThreads);

        class DijkstraTask implements Runnable {

            private String name;

            public DijkstraTask(String name) {
                this.name = name;
            }

            public String getName() {
                return name;
            }

            @Override
            public void run() {
                calculateShortestDistances(numberOfThreads);
            }
        }

        // Visit every node, in order of stored distance
        for (int i = 0; i < this.nodes.length; i++) {

            //Add task for each node
            for (int t = 0; t < numberOfThreads; t++) {
                executor.execute(new DijkstraTask("Task " + t));
            }

            //Wait until finished?
            while (executor.getActiveCount() > 0) {
                System.out.println("Active count: " + executor.getActiveCount());
            }

            //Look through the results of the tasks and get the next node that is closest by
            currentNode = getNodeShortestDistanced();

            //Reset the threadCounter for next iteration
            this.setCount(0);
        }
    }

Количество ребер делится на количество нитей. Таким образом, 8 ребер и 2 потока означают, что каждый поток будет иметь дело с 4 ребрами параллельно.

public void calculateShortestDistances(int numberOfThreads) {

        int threadCounter = this.getCount();
        this.setCount(count + 1);

        // Loop round the edges that are joined to the current node
        currentNodeEdges = this.nodes[currentNode].getEdges();

        int edgesPerThread = currentNodeEdges.size() / numberOfThreads;
        int modulo = currentNodeEdges.size() % numberOfThreads;
        this.nodes[0].setDistanceFromSource(0);
        //Process the edges per thread
        for (int joinedEdge = (edgesPerThread * threadCounter); joinedEdge < (edgesPerThread * (threadCounter + 1)); joinedEdge++) {

            System.out.println("Start: " + (edgesPerThread * threadCounter) + ". End: " + (edgesPerThread * (threadCounter + 1) + ".JoinedEdge: " + joinedEdge) + ". Total: " + currentNodeEdges.size());
            // Determine the joined edge neighbour of the current node
            int neighbourIndex = currentNodeEdges.get(joinedEdge).getNeighbourIndex(currentNode);

            // Only interested in an unvisited neighbour
            if (!this.nodes[neighbourIndex].isVisited()) {
                // Calculate the tentative distance for the neighbour
                int tentative = this.nodes[currentNode].getDistanceFromSource() + currentNodeEdges.get(joinedEdge).getLength();
                // Overwrite if the tentative distance is less than what's currently stored
                if (tentative < nodes[neighbourIndex].getDistanceFromSource()) {
                    nodes[neighbourIndex].setDistanceFromSource(tentative);
                }
            }
        }

        //if we have a modulo above 0, the last thread will process the remaining edges
        if (modulo > 0 && numberOfThreads == (threadCounter + 1)) {
            for (int joinedEdge = (edgesPerThread * threadCounter); joinedEdge < (edgesPerThread * (threadCounter) + modulo); joinedEdge++) {
                // Determine the joined edge neighbour of the current node
                int neighbourIndex = currentNodeEdges.get(joinedEdge).getNeighbourIndex(currentNode);

                // Only interested in an unvisited neighbour
                if (!this.nodes[neighbourIndex].isVisited()) {
                    // Calculate the tentative distance for the neighbour
                    int tentative = this.nodes[currentNode].getDistanceFromSource() + currentNodeEdges.get(joinedEdge).getLength();
                    // Overwrite if the tentative distance is less than what's currently stored
                    if (tentative < nodes[neighbourIndex].getDistanceFromSource()) {
                        nodes[neighbourIndex].setDistanceFromSource(tentative);
                    }
                }
            }
        }
        // All neighbours are checked so this node is now visited
        nodes[currentNode].setVisited(true);
    }

Спасибо за помощь!

Ответы [ 3 ]

2 голосов
/ 10 июля 2019

Вот простая демонстрация использования CountDownLatch для ожидания всех потоков в пуле:

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class WaitForAllThreadsInPool {

    private static int MAX_CYCLES = 10;

    public static void main(String args[]) throws InterruptedException, IOException {
        new WaitForAllThreadsInPool().apply(4);
    }

    public void apply(int numberOfThreads) {

        ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
        CountDownLatch cdl = new CountDownLatch(numberOfThreads);

        class DijkstraTask implements Runnable {

            private final String name;
            private final CountDownLatch cdl;
            private final Random rnd = new Random();

            public DijkstraTask(String name, CountDownLatch cdl) {
                this.name = name;
                this.cdl = cdl;
            }

            @Override
            public void run() {
                calculateShortestDistances(1+ rnd.nextInt(MAX_CYCLES), cdl, name);
            }
        }

        for (int t = 0; t < numberOfThreads; t++) {
            executor.execute(new DijkstraTask("Task " + t, cdl));
        }

        //wait for all threads to finish
        try {
            cdl.await();
            System.out.println("-all done-");
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    public void calculateShortestDistances(int numberOfWorkCycles, CountDownLatch cdl, String name) {

        //simulate long process
        for(int cycle = 1 ; cycle <= numberOfWorkCycles; cycle++){
            System.out.println(name + " cycle  "+ cycle + "/"+ numberOfWorkCycles );
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }

        cdl.countDown(); //thread finished
    }
}

Пример вывода:

Задание 0 цикл 1/3
Задание 1 цикл 1/2
Задание 3 цикл 1/9
Задание 2, цикл 1/3
Задание 0, цикл 2/3
Задание 1, цикл 2/2
Задание 2, цикл 2/3
Задание 3, цикл 2/9
Задание 0, цикл 3/3
Задание 2, цикл 3/3
Задание 3, цикл 3/9
Задание 3, цикл 4/9
Задание 3, цикл 5/9
Задание 3, цикл 6/9
Задание 3, цикл 7/9
Задание 3, цикл 8/9
Задание 3, цикл 9/9
-все сделано-

2 голосов
/ 10 июля 2019

Вы можете использовать invokeAll :

//Add task for each node
Collection<Callable<Object>> tasks = new ArrayList<>(numberOfThreads);
for (int t = 0; t < numberOfThreads; t++) {
    tasks.add(Executors.callable(new DijkstraTask("Task " + t)));
}

//Wait until finished
executor.invokeAll(tasks);
2 голосов
/ 10 июля 2019

Вы должны посмотреть на CyclicBarrier или CountDownLatch. Оба из них позволяют предотвратить запуск потоков, если другие потоки не сообщили, что они закончили. Разница между ними заключается в том, что CyclicBarrier можно использовать повторно, то есть можно использовать несколько раз, в то время как CountDownLatch является однократным, вы не можете сбросить счетчик.

Перефразируя из Javadocs:

A CountDownLatch - это средство синхронизации, позволяющее одному или нескольким потокам дождаться завершения набора операций, выполняемых в других потоках.

A CyclicBarrier - это средство синхронизации, позволяющее всем потокам набора ожидать друг друга, чтобы достичь общей барьерной точки. CyclicBarriers полезны в программах, включающих группу потоков фиксированного размера, которые иногда должны ждать друг друга. Барьер называется циклическим, поскольку его можно использовать повторно после освобождения ожидающих потоков.

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CyclicBarrier.html

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CountDownLatch.html

...