Java Thread Pools / Executor Service и wait () - что происходит с потоками и очередью задач? - PullRequest
17 голосов
/ 01 марта 2012

Я осмотрелся, но не нашел ответа, поэтому хотел наверняка это подтвердить.

Скажем, у меня есть пул потоков фиксированного размера - ExecutorService pool = Executors.newFixedThreadPool(5);

И у меня есть некоторый код:

pool.execute(new Runnable(){
    try{
        Object waitForMe = doSomethingAndGetObjectToWaitFor();
        waitForMe.wait();
        doSomethingElse();
    }catch(Exception e){ throw new RunTimeException(e) } 

});

Предположим, что приведенный выше код называется несколькими 100раз.В пуле только 5 потоков (поэтому только 5 из приведенных выше операторов должны быть активны в одной точке).Также предположим, что wait() находится на объекте, который выполняет некоторые вызовы ввода / вывода для третьей стороны и ожидает обратного вызова после завершения операции, поэтому, естественно, потребуется некоторое время для его завершения.

Теперь мой вопроскаково поведение, когда одна из этих задач достигает wait(), задача переходит в спящий режим, а затем поток из пула потоков снимает другую задачу с очереди и начинает ее запускать?

Если ожидающая задача переходит в спящий режим, что происходит, когда она получает notify() и просыпается?Возвращается ли поток обратно в очередь (спереди или сзади) для пула потоков и ждет, пока один из 5 потоков сможет продолжить его выполнение (т. Е. Вызов doSomethingelse())?Или поток, который его выполнял, также переходит в спящий режим, т. Е. Один из 5 потоков исполнителя ожидает выполнения задачи (это то, что я предполагаю)?Или поток исполнителя выбирает другую задачу и просто прерывается, когда первая задача возвращается из wait ()?

Ответы [ 4 ]

17 голосов
/ 01 марта 2012

wait() является операцией блокировки:

Заставляет текущий поток ждать, пока другой поток не вызовет метод notify () или notifyAll ()

Это означает, что поток в пуле будет ожидать, но снаружи он просто выглядит так, как будто текущая задача занимает так много времени. Это также означает, что если выполнено 5 задач и все они wait(), Executor не сможет обработать оставшиеся задачи, которые, например, ждут в очереди.

Правда, сам поток-исполнитель переходит в спящий режим, позволяя другим потокам переключаться и использовать ЦП (поэтому вы можете одновременно ожидать сотни потоков, и ваша система все еще реагирует), но поток все еще "непригоден" и заблокирован .

Еще одна интересная функция - прерывание - если поток чего-то ждет или спит, вы можете прервать его. Обратите внимание, что wait() и Thread.sleep() объявляют InterruptedException. С ExecutorService вы можете воспользоваться этим, просто вызвав: future.cancel() (future - это объект, который вы получили взамен при отправке задачи в ExecutorService).

Наконец, я думаю, вам следует изменить свое решение. Вместо активного ожидания завершения работы внешней системы предоставьте API с обратными вызовами:

pool.execute(new Runnable(){
    try{
        doSomethingAndCallMeBackWhenItsDone(new Callback() {
            public void done() {
                doSomethingElse();
            }
        });
    }catch(Exception e){ throw new RunTimeException(e) } 

});

Таким образом, API внешней системы просто уведомит вас, когда будут готовы результаты, и вам не придется ждать и блокировать ExecutorService. Наконец, если doSomethingElse() отнимает много времени, вы можете даже решить запланировать его, а не использовать сторонний поток ввода-вывода:

pool.execute(new Runnable(){
    try{
        doSomethingAndCallMeBackWhenItIsDone(new Callback() {
            public void done() {
                pool.submit(new Callbale<Void>() {
                    public Void call() {
                        doSomethingElse();
                    }
                }
            }
        });
    }catch(Exception e){ throw new RunTimeException(e) } 

});

ОБНОВЛЕНИЕ: вы спрашиваете, что делать с таймаутами? Вот моя идея:

pool.execute(new Runnable(){
    try{
        doSomethingAndCallMeBackWhenItsDone(new Callback() {
            public void done() {
                doSomethingElse();
            }
            public void timeout() {
                //opps!
            }
        });
    }catch(Exception e){ throw new RunTimeException(e) } 

});

Я полагаю, вы можете реализовать тайм-аут на стороне третьей стороны, и если там происходит тайм-аут, просто вызовите метод timeout().

1 голос
/ 01 марта 2012

wait() не может ничего знать о пуле протектора. И пул потоков не может ничего знать о wait(). Поэтому они никак не могут взаимодействовать.

Они работают как обычно - wait() просто является длительной блокирующей операцией, пул потоков - это просто очередь запускаемых программ для запуска в ограниченном пуле потоков.

0 голосов
/ 28 ноября 2015

Все 5 потоков будут заблокированы, и приложение будет в непродуктивном состоянии.

Добавляя к ответу Tomasz, я бы хотел реализовать механизм тайм-аута следующим образом.

            Future<Long> futureResult = service.execute(myCallable);
            Long result = null;
            try{
                result = futureResult.get(5000, TimeUnit.MILLISECONDS);
            }catch(TimeoutException e){
                System.out.println("Time out after 5 seconds");
                futureResult.cancel(true);
            }catch(InterruptedException ie){
                System.out.println("Error: Interrupted");
            }catch(ExecutionException ee){
                System.out.println("Error: Execution interrupted");
            }

Помимо TimeoutExceptionВы можете отменить Future во время InterruptedException & ExecutionException.Если вы используете submit () вместо execute (), InterruptedException & ExecutionException будет проглочен в самой среде.

0 голосов
/ 04 января 2014

Я бы прокомментировал ответ Томаша, но моя репутация не позволяет этого (пока), извините.

Я знаю, что вопрос старый, но для людей, которые все еще читают эту страницу, естьпосмотрите на Future и особенно на ListenableFuture в guava, который позволяет регистрировать обратные вызовы и связывать будущее вместе, в частности, с целью не блокировать ваш поток (и, таким образом, освободить поток обратно в пул для использования его другим)..

...