Координация Java Threadpool с условиями - PullRequest
1 голос
/ 20 апреля 2020

У меня есть пул потоков, который застревает. Программа ниже создает экземпляр ThreadLocks и передает в 1D массив целочисленных значений, которые действуют как «задачи».

Массив 1d представляет количество задач, которые необходимо выполнить. (EX: aValues = [100,200,300]) - это 100 задач, 200 задач и 300 задач. Каждый "Task layer" в предыдущем индексе должен быть завершен до того, как программа перейдет к исходному индексу задач. Таким образом, 100 задач в индексе 0 должны быть выполнены до запуска 200 задач в индексе 1.

Моя попытка координировать задачи выполняется с использованием другого массива int равной длины aValues (называемого syncValues). Программа предназначена для добавления 1 к индексу в syncvalues, когда задача по этому индексу завершена. Поэтому, когда все задачи в слое aValues выполнены, индексы aValues и syncValues должны быть одинаковыми, и программа может продолжаться.

Похоже, что это работает, когда количество задач невелико, однако , если количество задач велико программа выглядит так: зависает на неопределенное время .

Как я могу исправить код для большого количества задач?

Я сократил код, чтобы его было легче понять.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;

public class main {

    public static void main(String[] args)
    {
        //Shows the number of tasks needing completion before moving to correct index.
        int values[] = {500,500,500};

        //The object to complete the tasks
        ThreadLocks obAnne = new ThreadLocks(values);

        //runs through the "tasks"
        obAnne.calculate();
    }
}

class ThreadLocks {

    /* Thread Lock for sync */
    public static ReentrantLock lock = new ReentrantLock();

    /* ThreadPool that generates threads */
    private ExecutorService ThreadPool = Executors.newFixedThreadPool(1);

    /* Nodes complete a set of tasks, add 1 to syncValues after task completion*/
    private Node[][] nodes;
    private int[] syncValues;

    /* The array that holds the "tasks" to be completed */
    private int aValues[];


    public ThreadLocks(int[] aValues)
    {
        this.aValues = aValues;
        this.syncValues = new int[aValues.length];
        this.nodes = createNewTopology(aValues);
    }


    /**
    * Create the nodes in a 2d array. these nodes hold the tasks to be created.
     */
    private Node[][] createNewTopology(int[] topology)
    {
        Node[][] tmpTopology = new Node[topology.length][];

        for (int layerX = 0; layerX < topology.length; layerX++)
        {   
            Node[] layer = new Node[topology[layerX]];
            syncValues[layerX] = 0;
            for (int layerY = 0; layerY < topology[layerX]; layerY++)
            {
                    layer[layerY] = new Node(layerX);
            }
            tmpTopology[layerX] = layer;
        }
        return tmpTopology;     
    }

    /* Process Inputs, produce outputs */
    public void calculate()
    {
        /* starts a time for dubug purposes */
        long Time = System.currentTimeMillis();
        /* starts at index 1 as index 0 are inputs */
        for(int layerX = 1; layerX < nodes.length; layerX++)
        {           
            for (int layerY = 0; layerY < nodes[layerX].length; layerY++)
            {
                ThreadPool.execute(nodes[layerX][layerY]);
            }

            while(syncValues[layerX] < aValues[layerX]) {}; 
        }
        /*tracks time taken.. its how i know its done */
        System.out.print("Current Time :" + (System.currentTimeMillis() - Time));
    }


    private class Node implements Runnable
    {
        /* sets the node layer. describes what index of aValues the node is in.*/
        private int layerX;

        public Node(int layerX) {
            this.layerX = layerX;
        }

        /* Processes Data for Node on separate thread */
        @Override
        public void run()
        {   
            /* TASK WOULD BE PUT HERE */

            lock.lock();
            try {syncValues[layerX]++;}
            catch (Exception e){System.out.print("LockError");}
            finally{lock.unlock();}
        }
    }
}

1 Ответ

1 голос
/ 20 апреля 2020

Почему бы не использовать AtomicInteger? он обрабатывает параллелизм для вас без необходимости писать код синхронизации.

...