Вот упрощенная версия моего приложения, показывающая, что я делаю.
/*
in my app's main():
Runner run = new Runner();
run.dowork();
*/
class Runner
{
private int totalWorkers = 2;
private int workersDone = 0;
public synchronized void workerDone()
{
workersDone++;
notifyAll();
}
public synchronized void dowork()
{
workersDone = 0;
//<code for opening a file here, other setup here, etc>
Worker a = new Worker(this);
Worker b = new Worker(this);
while ((line = reader.readLine()) != null)
{
//<a large amount of processing on 'line'>
a.setData(line);
b.setData(line);
while (workersDone < totalWorkers)
{
wait();
}
}
}
}
class Worker implements Runnable
{
private Runner runner;
private String data;
public Worker(Runner r)
{
this.runner = r;
Thread t = new Thread(this);
t.start();
}
public synchronized void setData(String s)
{
this.data = s;
notifyAll();
}
public void run
{
while (true)
{
synchronized(this)
{
wait();
//<do work with this.data here>
this.runner.workerDone();
}
}
}
}
Основная концепция здесь заключается в том, что у меня есть группа работников, которые все выполняют некоторую обработку входящей строки данных, независимо друг от друга, и записывают данные в любое удобное для них место - им не нужно сообщать какие-либо данные обратно Основной поток или обмениваться данными друг с другом.
Проблема в том, что этот код блокируется. Я читаю файл, содержащий более 1 миллиона строк, и мне повезло, что в него было вставлено 100 строк, прежде чем мое приложение перестало отвечать.
В действительности все рабочие выполняют разный объем работы, поэтому я хочу подождать, пока все они завершат, прежде чем перейти к следующей строке.
Я не могу позволить рабочим обрабатывать на разных скоростях и ставить в очередь данные внутри, потому что файлы, которые я обрабатываю, слишком велики для этого и не помещаются в памяти.
Я не могу дать каждому работнику свой собственный FileReader для независимого получения «строки», потому что я выполняю тонну обработки в строке до того, как рабочие ее увидят, и не хочу повторной обработки в каждом работнике.
Я знаю, что мне не хватает некоторого довольно простого аспекта синхронизации в Java, но я застрял на этом этапе. Если бы кто-то мог объяснить, что я делаю здесь неправильно, я был бы признателен. Я полагаю, что неправильно понимаю некоторые аспекты синхронизации, но у меня нет идей для попытки исправить это.