Простой асинхронный ввод / вывод: много потоков, один файл - PullRequest
3 голосов
/ 10 июля 2011

У меня есть научное приложение, которое я обычно запускаю параллельно с xargs, но эта схема сопряжена с повторными затратами на запуск JVM и не учитывает ввод-вывод кэшированных файлов и компилятор JIT.Я уже адаптировал код для использования пула потоков, но я застрял на том, как сохранить мой вывод.

Программа (то есть один поток новой программы) читает два файла, выполняет некоторую обработку изатем выводит результат на стандартный вывод.В настоящее время я имел дело с выводом, когда каждый поток добавляет свою строку результата к BlockingQueue.Другой поток берет из очереди и записывает в файл, при условии, что логический флаг имеет значение true.Затем я awaitTermination и устанавливаю флаг в значение false, вызывая закрытие файла и выход из программы.

Мое решение кажется немного глупым;Какой самый простой и лучший способ сделать это?Как мне записать первичные данные результатов из множества потоков в один файл?

Ответ не обязательно должен быть специфичным для Java, если это, например, широко применимый метод.

Обновление

Я использую "СТОП" в качестве отравляющей таблетки.

while (true) {
    String line = queue.take();
    if (line.equals("STOP")) {
        break;
    } else {
        output.write(line);
    }
}
output.close();

Я вручную запускаю поток, потребляющий очередь, затем добавляю задания в потокпул, дождитесь завершения заданий и, наконец, отравите очередь и присоединитесь к потоку потребителя.

Ответы [ 5 ]

4 голосов
/ 10 июля 2011

Это действительно так, как вы хотите, чтобы потоки помещали свои выходные данные в очередь, а затем писатель исчерпывал их.

Единственное, что вы можете сделать, чтобы сделать вещи немного чище, - это не проверять флаг, а просто поместить токен «все готово» в очередь, которую писатель может использовать, чтобы знать, что она закончена. Таким образом, нет необходимости в внеполосной сигнализации.

Это тривиально, вы можете использовать хорошо известную строку, перечисление или просто общий объект.

2 голосов
/ 10 июля 2011

Вы можете использовать ExecutorService . Отправьте Callable , который будет выполнять задачу, и верните строку после завершения.

При отправке Callable вы получаете Future , сохраняйте эти ссылки, например. в списке.

Затем просто переберите Future s и получите строки, вызвав Future # get . Это будет блокировать до тех пор, пока задача не будет завершена, если это еще не сделано, в противном случае немедленно верните значение.

Пример:

ExecutorService exec = Executors.newFixedThreadPool(10);
List<Future<String>> tasks = new ArrayList<Future<String>>();
tasks.add(exec.submit(new Callable<String> {
    public String call() {
       //do stuff
       return <yourString>;
    }
}));

//and so on for the other tasks

for (Future<String> task : tasks) {
    String result = task.get();
    //write to output
}
1 голос
/ 10 июля 2011

Хорошая стратегия - обработка множества потоков, запись одного потока и очередь сообщений между ними. Вопрос, который просто нужно решить, это знать, когда вся работа закончена. Один из способов сделать это - подсчитать, сколько рабочих потоков вы запустили, а затем подсчитать, сколько ответов вы получили. Примерно такой псевдокод:

int workers = 0
for each work item {
   workers++
   start the item's worker in a separate thread
}
while workers > 0 {
   take worker's response from a queue
   write response to file
   workers--
}

Этот подход также работает, если рабочие могут находить больше рабочих элементов во время выполнения. Просто включите любую дополнительную еще не обработанную работу в ответы рабочих, а затем увеличьте число рабочих и запустите рабочие потоки, как обычно.

Если каждый из работников возвращает только одно сообщение, вы можете использовать Java ExecutorService для выполнения экземпляров Callable, которые возвращают результат. Методы ExecutorService предоставляют доступ к экземплярам Future, из которых вы можете получить результат, когда Callable завершит свою работу.

Таким образом, вы должны сначала отправить все задачи в ExecutorService, а затем перебрать все фьючерсы и получить их ответы. Таким образом, вы будете писать ответы в том порядке, в котором вы проверяете фьючерсы, которые могут отличаться от порядка, в котором они заканчивают свою работу. Если задержка не важна, это не должно быть проблемой. В противном случае очередь сообщений (как упомянуто выше) может быть более подходящей.

0 голосов
/ 10 июля 2011

Если у вас много потоков, записывающих в один и тот же файл, самое простое - записать этот файл в задании.

final PrintWriter out = 
ExecutorService es =
for(int i=0;i<tasks;i++)
    es.submit(new Runnable() {
        public void run() {
            performCalculations();
            // so only one thread can write to the file at a time.
            synchornized(out) {
                writeResults(out);
            }
        }
    });
 es.shutdown();
 es.awaitTermination(1, TimeUnit.HOUR);
 out.close();
0 голосов
/ 10 июля 2011

Непонятно, имеет ли ваш выходной файл какой-то определенный порядок или вы просто сбрасываете туда свои данные. Я предполагаю, что это не имеет порядка.

Я не понимаю, зачем вам нужен дополнительный поток для записи в вывод. Просто synchronized метод, который записывает в файл и вызывает его в конце каждого потока.

...