РЕДАКТИРОВАТЬ: Спасибо, Марк, и для тех, у кого похожая проблема, моя проблема заключалась в том, что я сначала делал экземпляр Thread для запускаемого класса, а затем отправлял поток в службу executorservice.
Это помогло Я понимаю, что на самом деле, когда я использую ExecutorService, если есть необработанное исключение; он не сообщит вам, он отменит процесс, без уведомления. Вот почему я получаю незавершенную обработку.
У меня есть ArrayList объектов, которые я хочу обрабатывать партиями в многопоточном режиме, но ограничиваю количество потоков, запущенных в данный момент времени. Я обнаружил, что ExecutorService может справиться с этим. Но после проверки, обрабатывает ли она каждую запись, кажется, что она обрабатывает только очень небольшую часть объектов, которые я ей передаю.
РЕДАКТИРОВАТЬ: я удалил многопоточную часть, и обрабатывал объекты как обычно без использования службы executor, в небольшой партии (только 710) он работает нормально; Есть ли вероятность того, что потоки завершаются слишком быстро и обрабатываются неправильно? Это обычно позволяет обрабатывать около 300 000-800 записей одновременно; вот почему я хотел бы многопоточность.
public void processContainerRecords(ArrayList<? extends ContainerRecord> records) {
int cores = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(cores);
int batchSize = Settings.LOGIC_BATCH_SIZE;//100
int batches = (int) Math.ceil((double) records.size() / (double) batchSize);
ArrayList<Future<?>> threads = new ArrayList<Future<?>>();
LogicProcessor newHandler = null;
for (int startIndex = 0; startIndex < records.size(); startIndex += batchSize + 1) {
if (records.size() < batchSize) {
newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, records.size()));
} else {
int bound = (startIndex + batchSize);
if (bound > records.size()) {
bound = records.size();
}
newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, bound));
}
Thread newThread = new Thread(newHandler);
Future<?> f = executor.submit(newThread);
threads.add(f);
}
executor.shutdown();
int completedThreads = 0;
while (!executor.isTerminated()) {//monitors threads and waits until completion
completedThreads = 0;
for (Future<?> f : threads) {
if (f.isDone()) {
completedThreads++;
}
}
//currentProgress = completedThreads;
}
for (ContainerRecord record : records) {//checks if each record has been processed
System.out.println(record.getContainer() + ":" + record.isTouched());
}
}
Это класс LogicProcessor, который запускает экземпляры потока
private List<? extends ContainerRecord> archive;
private GUI mainGUI;
public LogicProcessor(GUI mainGUI, List<? extends ContainerRecord> records) {
this.mainGUI = mainGUI;
this.archive = records;
}
@Override
public void run() {
handleLogic();
}
private void handleLogic() {
Iterator iterator = archive.iterator();
while (iterator.hasNext()) {
ContainerRecord record = (ContainerRecord) iterator.next();
record.touch();//sets a boolean in the object to validate if it has been processed yet.
}
}
Вывод: из 710 записей (объектов) обработано, 691 никогда не обрабатывалось / не трогалось, и только 19 имеют.
Что с этим не так? Я много чего пытался сделать, даже создав массив класса LogicProcessor и сохранив экземпляры в массиве, чтобы избежать какого-либо G C удаления экземпляра. Я не уверен, почему он не обрабатывает эти записи.