Как я знаю, когда ExecutorService закончил, если элементы на ES могут повторно передать ES - PullRequest
2 голосов
/ 16 июня 2019

Мое Java-приложение работает с музыкальными файлами в папках, оно предназначено для параллельной и независимой обработки нескольких папок.Для этого каждая папка обрабатывается службой ExecutorService с максимальным размером пула, который не соответствует ни одному из процессоров компьютера.

Например, если у нас компьютер с 8 процессорами, то восемь папок (теоретически) могут быть (теоретически)обрабатывается одновременно, если у нас компьютер с 16 процессорами, то 16 папок могут обрабатываться одновременно.Если у нас только один ЦП, то мы устанавливаем размер пула равным 3, чтобы ЦП продолжал что-то делать, если одна папка заблокирована при вводе-выводе.

Однако на самом деле у нас нет только одного ExecutorService, который мыиметь более одного, потому что каждая папка может пройти несколько этапов.

Process1 (использует ExecutorService1) → Process2 (ExecutorService2) → Process3 (ExecutorService3)

Процесс 1,2,3 и т. д. всереализует Callable, и все они имеют свои собственные связанные ExecutorService.Есть процесс FileLoader, который мы запускаем, и он загружает папки, а затем создает вызываемый для Process1 файл для каждой папки и передает его исполнителю Process1, для каждого вызываемого Process1 он выполняет свою работу и затем передает другому вызываемому объекту, это может быть Process2, Process3ecetera, но мы никогда не идем назад, например, Process3 никогда не подчинится Process1.На самом деле у нас есть 12 процессов, но вряд ли какая-либо конкретная папка будет проходить через все 12 процессов

Но я понял, что это некорректно, потому что в случае 16-CPU компьютера каждый ES можетразмер пула равен 16, поэтому у нас на самом деле запущено 48 потоков, и это приведет к слишком большому количеству конфликтов.

Итак, я собирался, чтобы все процессы (Process1, Process2 ...) использовали один и тот же ExecutorService.Таким образом, мы когда-либо работаем только с рабочими потоками, соответствующими процессорам.

Однако в моей текущей ситуации у нас есть процесс SongLoader, который имеет только одну отправленную задачу (загрузка всех папок), и затем мы вызываем shutdown (), этоне завершится, пока все не будет отправлено в Process0, тогда shutdown () в Process0 не будет выполнена, пока все не будет отправлено в Process1 и т. д.

 //Init Services
 services.add(songLoaderService);
 services.add(Process1.getExecutorService());
 services.add(Process2.getExecutorService());
 services.add(Process3.getExecutorService());

 for (ExecutorService service : services)
     //Request Shutdown
     service.shutdown();

     //Now wait for all submitted tasks to complete
     service.awaitTermination(10, TimeUnit.DAYS);
 }
 //...............
 //Finish Off work

Однако, если все было на тех же ES и Process1отправлял в Process2 это больше не будет работать, потому что во время shutdown () вызывались не все папки, которые Process1 wмог бы отправить в Process2, чтобы он был преждевременно завершен.

Итак, как мне определить, когда все работы были завершены с использованием одного ExecutorService, когда задачи в этой ES могут быть переданы другим задачам в том жеES?

Или есть лучший подход?

Примечание , вы можете подумать, почему он просто не объединяет логику Процесса 1,2 и 3 вединый процесс.Сложность состоит в том, что, хотя я изначально группирую песни по папкам, иногда песни разбиваются на более мелкие группы, и они распределяются по отдельным процессам, не связанным с одним и тем же процессом, на самом деле всего 12 процессов.

Попытка, основанная на идее Шолмса

Главный поток

    private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
    private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
    ...
    SongLoader loader = SongLoader.getInstanceOf(parentFolder);
    ExecutorService songLoaderService =  SongLoader.getExecutorService();
    songLoaderService.submit(loader);
    for(Future future : futures)
    {
        try
        {
             future.get();
        }
        catch (InterruptedException ie)
        {
            SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
            getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
        }
        catch(ExecutionException e)
        {
            SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
        }
    }
    songLoaderService.shutdown();

С кодом процесса, отправляющим новые задачи с использованием этой функции из MainAnalyserService

public void submit(Callable<Boolean> task) //throws Exception
{
    FixSongsController.getFutures().add(getExecutorService().submit(task));
}

Выглядело, как будто оно работало, но не получилось с

java.util.ConcurrentModificationException
    at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
    at java.base/java.util.ArrayList$Itr.next(Unknown Source)
    at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
    at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

, и теперь я освобождаю, я не могу открыть поток, вызывающий future.get () (который ждет, покаготово), в то же время другие темы добавляются в список.

Ответы [ 3 ]

2 голосов
/ 17 июня 2019

Я согласен со Shloim, что вам не нужно несколько экземпляров ExecutorService здесь - достаточно одного (с учетом количества доступных процессоров), которое достаточно и фактически оптимально. На самом деле, я думаю, вам не нужно ExecutorService; простой Executor может сделать эту работу, если вы используете внешний механизм полноты сигнализации.

Я бы начал с создания класса, который представлял бы весь большой рабочий элемент. Если вам нужно использовать результаты каждого дочернего рабочего элемента, вы можете использовать очередь, но если вы просто хотите узнать, есть ли работа, которую нужно выполнить, вам нужен только счетчик.

Например, вы можете сделать что-то вроде этого:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private int pendingItems;  // guarded by monitor lock on this instance

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public synchronized void enqueueMoreWork(File file) {
        pendingItems++;
        executor.execute(new FileWork(file, this));
    }

    public synchronized void markWorkItemCompleted() {
        pendingItems--;
        notifyAll();
    }

    public synchronized boolean hasPendingWork() {
        return pendingItems > 0;
    }

    public synchronized void awaitCompletion() {
       while (pendingItems > 0) {
           wait();
       }
    }
}

public class FileWork implements Runnable {
    private final File file;
    private final FolderWork parent;

    public FileWork(File file, FolderWork parent) {
        this.file = file;
        this.parent = parent;
    }

    @Override
    public void run() {
        try {
           // do some work with the file

           if (/* found more work to do */) {
               parent.enqueueMoreWork(...);
           }
        } finally {
            parent.markWorkItemCompleted();
        }
    }
}

Если вы беспокоитесь о накладных расходах синхронизации для счетчика pendingItems, вы можете использовать вместо него AtomicInteger. Тогда вам нужен отдельный механизм для уведомления ожидающего потока, что мы закончили; например, вы можете использовать CountDownLatch. Вот пример реализации:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public void enqueueMoreWork(File file) {
        if (latch.getCount() == 0) {
            throw new IllegalStateException(
                "Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
        }
        pendingItems.incrementAndGet();
        executor.execute(new FileWork(file, this));
    }

    public void markWorkItemCompleted() {
        int remainingItems = pendingItems.decrementAndGet();
        if (remainingItems == 0) {
            latch.countDown();
        }
    }

    public boolean hasPendingWork() {
        return pendingItems.get() > 0;
    }

    public void awaitCompletion() {
       latch.await();
    }
}

Вы бы назвали это так:

Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();

В этом примере показан только один уровень дочерних рабочих элементов, но вы можете использовать любое количество дочерних рабочих элементов, если они все используют один и тот же счетчик pendingItems, чтобы отслеживать, сколько работы осталось сделать.

1 голос
/ 16 июня 2019

Не shutdown() ExecutorService.Вместо этого создайте Callable объекты и сохраните Future объекты, которые они создают.Теперь вы можете ждать на Future объектах вместо ожидания на ExecutorService.Обратите внимание, что теперь вам придется ждать каждого будущего объекта в отдельности, но если вам нужно только знать, когда заканчивается последний объект, вы можете просто выполнить итерацию по ним в любом заданном порядке и вызвать get().

Любая задача может отправлять больше задач, и ей необходимо убедиться, что ее будущий объект помещен в очередь, которая будет отслеживаться вашим основным потоком.

// put these somewhere public
ConcurrentLinkedQueue<Future<Boolean>> futures = new ConcurrentLinkedQueue<Future<Boolean>>();
ExecutorService executor = ...

void submit(Callable<Boolean> c) {
    futures.add(executor.submit(c));
}

Теперь ваш основной поток может начать отправлять задачи и ждатьвсе задачи и подзадачи:

void mainThread() {
    // add some tasks from main thread
   for(int i=0 ; i<N ; ++i){
        Callable<Boolean> callable = new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                ...
            }
        submit(callable);
    }

    Future<Boolean> head = null;
    while((head=futures.poll()) != null){
        try {
            head.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
    // At this point, all of your tasks are complete including subtasks.
    executor.shutdown();
    executor.awaitTermination(); // should return almost immediately
}
0 голосов
/ 17 июня 2019

Это по сути решение @DanielPrydens, но я немного помассировал его, чтобы он более четко показал, как решить мою конкретную проблему

Создан новый класс MainAnalyserService , который обрабатывает создание ExecutorService и предоставляет возможность подсчитывать, когда отправляются новые вызываемые задачи и когда они завершены

public class MainAnalyserService 
{
    public static final int MIN_NUMBER_OF_WORKER_THREADS = 3;
    protected static int BOUNDED_QUEUE_SIZE = 100;

    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    private static final int TIMEOUT_PER_TASK = 30;

    protected  ExecutorService      executorService;

    protected String threadGroup;

    public MainAnalyserService(String threadGroup)
    {
       this.threadGroup=threadGroup;
       initExecutorService();
    }

    protected void initExecutorService()
    {
        int workerSize = Runtime.getRuntime().availableProcessors();
        //Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
        if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
        {
            workerSize = MIN_NUMBER_OF_WORKER_THREADS;
        }

        executorService = new TimeoutThreadPoolExecutor(workerSize,
                new SongKongThreadFactory(threadGroup),
                new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
                TIMEOUT_PER_TASK,
                TimeUnit.MINUTES);
    }

    public void submit(Callable<Boolean> task) //throws Exception
    {
        executorService.submit(task);
        pendingItems.incrementAndGet();
    }

    public void workDone()
    {
        int remainingItems = pendingItems.decrementAndGet();
        if (remainingItems == 0)
        {
            latch.countDown();
        }
    }

    public void awaitCompletion() throws InterruptedException{
        latch.await();
    }
}

В потоке FixSongsController у нас есть

analyserService = new MainAnalyserService(THREAD_WORKER);

//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all initial folder submissions completed
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
songLoaderService.shutdown();

//Wait for all aysnc tasks to complete
analyserService.awaitCompletion();

Затем любой Вызываемый (такой как Process1, Process2 и т. Д.) Вызывает submit () для отправки нового Callable на ExecutorService , а затем он должен вызвать workDone () после завершения, поэтому для обеспечения этого я добавляю в блок finally в вызове () каждого метода класса Process

например

public Boolean call() 
{
    try
    {
        //do stuff
        //Possibly make multiple calls to                      
        FixSongsController.getAnalyserService().submit();
    }
    finally
    {
        FixSongsController.getAnalyserService().workDone();
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...