Проблема, с которой я столкнулся, заключается в том, что первый набор потоков вращается, но процесс не ждет, пока все потоки не завершат свою работу и все 5000000 строк не будут завершены.
Когдавы выполняете задания, используя ExecutorService
, они добавляются в службу и запускаются в фоновом режиме.Чтобы дождаться их завершения, нужно дождаться завершения службы:
ExecutorService service = Executors.newFixedThreadPool(10);
// submit jobs to the service here
// after the last job has been submitted, we immediately shutdown the service
service.shutdown();
// then we can wait for it to terminate as the jobs run in the background
service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
Кроме того, если в этих файлах есть дерьмо, я бы порекомендовал использовать ограниченная очередь для заданий, так что вы не выбрасываете память, эффективно кэшируя все строки в файле.Это работает, только если файлы остаются и не исчезают.
// this is the same as a newFixedThreadPool(10) but with a queue of 100
ExecutorService service = new ThreadPoolExecutor(10, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(100));
// set a rejected execution handler so we block the caller once the queue is full
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
Запишите эту запись в блок chunkedwriter, который в итоге группирует 10000 строк для обратной записи фрагмента в удаленное местоположение
По завершении каждого задания A, B, C, если его нужно обработать на втором шаге, я бы также рекомендовал изучить ExecutorCompletionService
, который позволяет объединять различные пулы потоков вместе, чтобы получить линиипо окончании они сразу же начнут работать на 2-й фазе обработки.
Если вместо этого chunkedWriter
является просто одним потоком, то я бы рекомендовал разделить BlockingQueue<Result>
и поместить потоки исполнителя в очередькак только строки сделаны, и chunkedWriter
извлекает из очереди и выполняет группирование и запись результатов.В этой ситуации указание потоку записи, что это сделано, должно быть обработано осторожно - возможно, с помощью какой-то постоянной END_RESULT
, помещенной в очередь главным потоком, ожидающим завершения службы.