Как правило, вы не можете прерывать потоки ExecutorService
из внешнего кода, если вы использовали ExecutorService::execute(Runnable)
для запуска потоков, потому что внешний код не имеет ссылки на Thread
объекты каждого из работающих темы (см. конец этого ответа для решения, если вам нужно ExecutorService::execute
). Однако если вы вместо этого используете ExecutorService::submit(Callable<T>)
для отправки заданий, вы получаете Future<T>
, который внутренне сохраняет ссылку на работающий поток, как только Callable::call()
начинает выполнение. Этот поток может быть прерван с помощью вызова Future::cancel(true)
. Любой код внутри (или вызываемый) Callable
, который проверяет состояние прерывания текущего потока, может поэтому быть прерван через ссылку Future
. Это включает BlockingQueue::take()
, который, даже если он заблокирован, будет реагировать на прерывание потока. (Методы блокировки JRE обычно просыпаются, если их прерывают, когда блокируются, понимают, что они были прерваны, и выдают InterruptedException
.)
Подводя итог: Future::cancel()
и Future::cancel(true)
оба отменяют будущую работу , в то время как Future::cancel(true)
также прерывает текущую работу (до тех пор, пока текущая работа реагирует на прерывание потока) , Ни один из двух вызовов cancel
не влияет на работу, которая уже успешно завершена.
Обратите внимание, что после прерывания потока из-за отмены в поток будет добавлено InterruptException
(например, BlockingQueue::take()
в этом случае). Тем не менее, вы CancellationException
будете отброшены в главном потоке при следующем вызове Future::get()
на успешно отмененном Future
(то есть на Future
, который был отменен до его завершения). Это отличается от того, что вы обычно ожидаете: если неотмененный Callable
выбрасывает InterruptedException
, следующий вызов Future::get()
будет выбрасывать InterruptedException
, но если отмененный Callable
выбрасывает InterruptedException
, следующий позвонить на Future::get()
через CancellationException
.
Вот пример, который иллюстрирует это:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
public class Test {
public static void main(String[] args) throws Exception {
// Start Executor with 4 threads
int numThreads = 4;
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
try {
// Set up BlockingQueue for inputs, and List<Future> for outputs
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
List<Future<String>> futures = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
int threadIdx = i;
futures.add(executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
try {
// Get an input from the queue (blocking)
int val = queue.take();
return "Thread " + threadIdx + " got value " + val;
} catch (InterruptedException e) {
// Thrown once Future::cancel(true) is called
System.out.println("Thread " + threadIdx + " got interrupted");
// This value is returned to the Future, but can never
// be read, since the caller will get a CancellationException
return "Thread " + threadIdx + " got no value";
}
}
}));
}
// Enqueue (numThreads - 1) values into the queue, so that one thread blocks
for (int i = 0; i < numThreads - 1; i++) {
queue.add(100 + i);
}
// Cancel all futures
for (int i = 0; i < futures.size(); i++) {
Future<String> future = futures.get(i);
// Cancel the Future -- this doesn't throw an exception until
// the get() method is called
future.cancel(/* mayInterruptIfRunning = */ true);
try {
System.out.println(future.get());
} catch (CancellationException e) {
System.out.println("Future " + i + " was cancelled");
}
}
} finally {
// Terminate main after all threads have shut down (this call does not block,
// so main will exit before the threads stop running)
executor.shutdown();
}
}
}
Каждый раз, когда вы запускаете это, вывод будет отличаться, но вот один прогон:
Future 1 was cancelled
Future 0 was cancelled
Thread 2 got value 100
Thread 3 got value 101
Thread 1 got interrupted
Это показывает, что поток 2 и поток 3 завершены до вызова Future::cancel()
. Поток 1 был отменен, поэтому внутренне InterruptedException
было брошено, а внешне CancellationException
было брошено. Поток 0 был отменен, прежде чем он начал работать. (Обратите внимание, что индексы потоков в общем случае не коррелируют с индексами Future
, поэтому Future 0 was cancelled
может соответствовать либо отмене потока 0, либо потока 1, и то же самое для Future 1 was cancelled
.)
Дополнительно: Один из способов достижения того же эффекта с помощью Executor::execute
(который не возвращает ссылку Future
) вместо Executor::submit
состоит в создании ThreadPoolExecutor
с пользовательским ThreadFactory
, и пусть ваша ThreadFactory
записывает ссылку в параллельную коллекцию (например, параллельную очередь) для каждого созданного потока. Затем, чтобы отменить все потоки, вы можете просто вызвать Thread::interrupt()
во всех ранее созданных потоках. Однако вам нужно будет иметь дело с условием гонки, что новые потоки могут быть созданы, пока вы прерываете существующие потоки. Чтобы справиться с этим, установите флаг AtomicBoolean
, видимый для ThreadFactory
, который говорит ему не создавать больше потоков, затем, как только это будет установлено, отмените существующие потоки.