Правильная синхронизация потоков Java с использованием wait / notifyAll? - PullRequest
1 голос
/ 09 декабря 2008

Вот упрощенная версия моего приложения, показывающая, что я делаю.

/*
in my app's main():

    Runner run = new Runner();

    run.dowork();

*/

class Runner
{
    private int totalWorkers = 2;
    private int workersDone = 0;

    public synchronized void workerDone()
    {
        workersDone++;
        notifyAll();
    }

    public synchronized void dowork()
    {
        workersDone = 0;

        //<code for opening a file here, other setup here, etc>

        Worker a = new Worker(this);
        Worker b = new Worker(this);

        while ((line = reader.readLine()) != null)
        {
            //<a large amount of processing on 'line'>

            a.setData(line);
            b.setData(line);

            while (workersDone < totalWorkers)
            {
                wait();
            }               
        }
    }
}

class Worker implements Runnable
{
    private Runner runner;
    private String data;

    public Worker(Runner r)
    {
        this.runner = r;
        Thread t = new Thread(this);
        t.start();
    }

    public synchronized void setData(String s)
    {
        this.data = s;
        notifyAll();
    }

    public void run
    {
        while (true)
        {
            synchronized(this)
            {
                wait();

                //<do work with this.data here>

                this.runner.workerDone();
            }
        }
    }
}

Основная концепция здесь заключается в том, что у меня есть группа работников, которые все выполняют некоторую обработку входящей строки данных, независимо друг от друга, и записывают данные в любое удобное для них место - им не нужно сообщать какие-либо данные обратно Основной поток или обмениваться данными друг с другом.

Проблема в том, что этот код блокируется. Я читаю файл, содержащий более 1 миллиона строк, и мне повезло, что в него было вставлено 100 строк, прежде чем мое приложение перестало отвечать.

В действительности все рабочие выполняют разный объем работы, поэтому я хочу подождать, пока все они завершат, прежде чем перейти к следующей строке.

Я не могу позволить рабочим обрабатывать на разных скоростях и ставить в очередь данные внутри, потому что файлы, которые я обрабатываю, слишком велики для этого и не помещаются в памяти.

Я не могу дать каждому работнику свой собственный FileReader для независимого получения «строки», потому что я выполняю тонну обработки в строке до того, как рабочие ее увидят, и не хочу повторной обработки в каждом работнике.

Я знаю, что мне не хватает некоторого довольно простого аспекта синхронизации в Java, но я застрял на этом этапе. Если бы кто-то мог объяснить, что я делаю здесь неправильно, я был бы признателен. Я полагаю, что неправильно понимаю некоторые аспекты синхронизации, но у меня нет идей для попытки исправить это.

Ответы [ 2 ]

3 голосов
/ 09 декабря 2008

Работать напрямую с synchronized, wait() и notify() определенно сложно.

К счастью, API параллелизма Java предоставляет несколько превосходных объектов управления для такого рода вещей, которые намного более интуитивны. В частности, посмотрите на CyclicBarrier и CountDownLatch; один из них почти наверняка будет тем, что вы ищете.

Вы также можете найти ThreadPoolExecutor для этой ситуации.

Вот простой пример / преобразование вашего фрагмента, который выдает следующий результат (конечно, без тупика):

Считать строку: строка 1
Ожидание завершения работ по линии: линия 1
Работа на линии: Линия 1
Работа на линии: Линия 1
Чтение строки: строка 2
Ожидание завершения работ по линии: строка 2
Работа на линии: Линия 2
Работа на линии: Линия 2
Читать строку: Строка 3
Ожидание завершения работ по линии: строка 3
Работа на линии: Линия 3
Работа на линии: Линия 3
Все работы завершены!

public class Runner
{

    public static void main(String args[]) {
        Runner r = new Runner();
        try {
            r.dowork();
        } catch (IOException e) {
            // handle
            e.printStackTrace();
        }
    }

    CyclicBarrier barrier;
    ExecutorService executor;
    private int totalWorkers = 2;

    public Runner() {
        this.barrier = new CyclicBarrier(this.totalWorkers + 1);
        this.executor = Executors.newFixedThreadPool(this.totalWorkers);
    }

    public synchronized void dowork() throws IOException
    {
        //<code for opening a file here, other setup here, etc>
        //BufferedReader reader = null;
        //String line;

        final Worker worker = new Worker();

        for(String line : new String[]{"Line 1", "Line 2", "Line 3"})
        //while ((line = reader.readLine()) != null)
        {
            System.out.println("Read line: " + line);
            //<a large amount of processing on 'line'>

            for(int c = 0; c < this.totalWorkers; c++) {
                final String curLine = line;
                this.executor.submit(new Runnable() {
                    public void run() {
                        worker.doWork(curLine);
                    }
                });
            }

            try {
                System.out.println("Waiting for work to be complete on line: " + line);
                this.barrier.await();
            } catch (InterruptedException e) {
                // handle
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // handle
                e.printStackTrace();
            }
        }

        System.out.println("All work complete!");
    }

    class Worker
    {
        public void doWork(String line)
        {
            //<do work with this.data here>
            System.out.println("Working on line: " + line);

            try {
                Runner.this.barrier.await();
            } catch (InterruptedException e) {
                // handle
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // handle
                e.printStackTrace();
            }
        }
    }    
}
0 голосов
/ 09 декабря 2008

ИМХО вы неправильно разместили "workerDone = 0".

public synchronized void dowork()
        {
                // workersDone = 0;

                //<code for opening a file here, other setup here, etc>

                Worker a = new Worker(this);
                Worker b = new Worker(this);

                while ((line = reader.readLine()) != null)
                {
                        workersDone = 0;

                        //<a large amount of processing on 'line'>

                        a.setData(line);
                        b.setData(line);

                        while (workersDone < totalWorkers)
                        {
                                wait();
                        }                               
                }
        }
...