Мое Java-приложение работает с музыкальными файлами в папках, оно предназначено для параллельной и независимой обработки нескольких папок.Для этого каждая папка обрабатывается службой ExecutorService с максимальным размером пула, который не соответствует ни одному из процессоров компьютера.
Например, если у нас компьютер с 8 процессорами, то восемь папок (теоретически) могут быть (теоретически)обрабатывается одновременно, если у нас компьютер с 16 процессорами, то 16 папок могут обрабатываться одновременно.Если у нас только один ЦП, то мы устанавливаем размер пула равным 3, чтобы ЦП продолжал что-то делать, если одна папка заблокирована при вводе-выводе.
Однако на самом деле у нас нет только одного ExecutorService, который мыиметь более одного, потому что каждая папка может пройти несколько этапов.
Process1 (использует ExecutorService1) → Process2 (ExecutorService2) → Process3 (ExecutorService3)
Процесс 1,2,3 и т. д. всереализует Callable, и все они имеют свои собственные связанные ExecutorService.Есть процесс FileLoader, который мы запускаем, и он загружает папки, а затем создает вызываемый для Process1 файл для каждой папки и передает его исполнителю Process1, для каждого вызываемого Process1 он выполняет свою работу и затем передает другому вызываемому объекту, это может быть Process2, Process3ecetera, но мы никогда не идем назад, например, Process3 никогда не подчинится Process1.На самом деле у нас есть 12 процессов, но вряд ли какая-либо конкретная папка будет проходить через все 12 процессов
Но я понял, что это некорректно, потому что в случае 16-CPU компьютера каждый ES можетразмер пула равен 16, поэтому у нас на самом деле запущено 48 потоков, и это приведет к слишком большому количеству конфликтов.
Итак, я собирался, чтобы все процессы (Process1, Process2 ...) использовали один и тот же ExecutorService.Таким образом, мы когда-либо работаем только с рабочими потоками, соответствующими процессорам.
Однако в моей текущей ситуации у нас есть процесс SongLoader, который имеет только одну отправленную задачу (загрузка всех папок), и затем мы вызываем shutdown (), этоне завершится, пока все не будет отправлено в Process0, тогда shutdown () в Process0 не будет выполнена, пока все не будет отправлено в Process1 и т. д.
//Init Services
services.add(songLoaderService);
services.add(Process1.getExecutorService());
services.add(Process2.getExecutorService());
services.add(Process3.getExecutorService());
for (ExecutorService service : services)
//Request Shutdown
service.shutdown();
//Now wait for all submitted tasks to complete
service.awaitTermination(10, TimeUnit.DAYS);
}
//...............
//Finish Off work
Однако, если все было на тех же ES и Process1отправлял в Process2 это больше не будет работать, потому что во время shutdown () вызывались не все папки, которые Process1 wмог бы отправить в Process2, чтобы он был преждевременно завершен.
Итак, как мне определить, когда все работы были завершены с использованием одного ExecutorService, когда задачи в этой ES могут быть переданы другим задачам в том жеES?
Или есть лучший подход?
Примечание , вы можете подумать, почему он просто не объединяет логику Процесса 1,2 и 3 вединый процесс.Сложность состоит в том, что, хотя я изначально группирую песни по папкам, иногда песни разбиваются на более мелкие группы, и они распределяются по отдельным процессам, не связанным с одним и тем же процессом, на самом деле всего 12 процессов.
Попытка, основанная на идее Шолмса
Главный поток
private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
...
SongLoader loader = SongLoader.getInstanceOf(parentFolder);
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
for(Future future : futures)
{
try
{
future.get();
}
catch (InterruptedException ie)
{
SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
}
catch(ExecutionException e)
{
SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
}
}
songLoaderService.shutdown();
С кодом процесса, отправляющим новые задачи с использованием этой функции из MainAnalyserService
public void submit(Callable<Boolean> task) //throws Exception
{
FixSongsController.getFutures().add(getExecutorService().submit(task));
}
Выглядело, как будто оно работало, но не получилось с
java.util.ConcurrentModificationException
at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.base/java.util.ArrayList$Itr.next(Unknown Source)
at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
, и теперь я освобождаю, я не могу открыть поток, вызывающий future.get () (который ждет, покаготово), в то же время другие темы добавляются в список.