При каких условиях BlockingQueue.take выбрасывает прерванное исключение? - PullRequest
8 голосов
/ 24 декабря 2009

Допустим, у меня есть поток, который потребляет элементы, созданные другим потоком. Его метод запуска следующий: inQueue - BlockingQueue

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        w.consume();
    } catch (InterruptedException e) { 
        shutdown = true;
    }
}

Кроме того, другой поток будет сигнализировать, что больше нет рабочих элементов, прерывая этот рабочий поток. Метод take () выдаст прерванное исключение, если ему не нужно блокировать для получения следующего рабочего элемента. то есть, если производитель сообщает, что он завершил заполнение рабочей очереди, можно ли случайно оставить некоторые элементы в inQueue или пропустить прерывание?

Ответы [ 5 ]

4 голосов
/ 24 декабря 2009

Хороший способ сообщить об окончании блокирующей очереди - отправить в очередь «ядовитое» значение, указывающее на то, что произошло отключение. Это обеспечивает соблюдение ожидаемого поведения очереди. Вызов Thread.interupt (), вероятно, не очень хорошая идея, если вы хотите очистить очередь.

Чтобы указать код:

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        if (w == QUEUE_IS_DEAD)
          shutdown = true;
        else
          w.consume();
    } catch (InterruptedException e) { 
        // possibly submit QUEUE_IS_DEAD to the queue
    }
}
3 голосов
/ 24 декабря 2009

Согласно javadoc , метод take() выдает InterruptedException в случае прерывания во время ожидания.

2 голосов
/ 08 июля 2011

Я размышлял об этом и читал javadoc для take () . Я полагал, что это вызовет прерванное исключение только после получения всех элементов в очереди, поскольку, если в очереди есть элементы, не нужно было бы «ждать». Но я сделал небольшой тест:

package se.fkykko.slask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class BlockingQueueTakeTest {

public static void main(String[] args) throws Exception {
    Runner t = new Runner();
    Thread t1 = new Thread(t);
    for (int i = 0; i < 50; i++) {
        t.queue.add(i);
    }
    System.out.println(("Number of items in queue: " + t.queue.size()));
    t1.start();
    Thread.sleep(1000);
    t1.interrupt();
    t1.join();
    System.out.println(("Number of items in queue: " + t.queue.size()));
    System.out.println(("Joined t1. Finished"));

}

private static final class Runner implements Runnable {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100);
    AtomicLong m_count = new AtomicLong(0);

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                System.out.println("Took item " + m_count.incrementAndGet());
                final long start = System.currentTimeMillis();
                while ((System.currentTimeMillis() - start) < 100) {
                    Thread.yield(); //Spin wait
                }
            }
        }
        catch (InterruptedException ex) {
            System.out.println("Interrupted. Count: " + m_count.get());
        }
    }
}

}

Бегун возьмет 10-11 предметов, а затем завершит, т.е. take () сгенерирует InterruptedException, даже если в очереди еще есть предметы.

Сводка: используйте вместо этого подход «Ядовитая таблетка», тогда у вас будет полный контроль над тем, сколько осталось в очереди.

1 голос
/ 09 февраля 2019

Как правило, вы не можете прерывать потоки ExecutorService из внешнего кода, если вы использовали ExecutorService::execute(Runnable) для запуска потоков, потому что внешний код не имеет ссылки на Thread объекты каждого из работающих темы (см. конец этого ответа для решения, если вам нужно ExecutorService::execute). Однако если вы вместо этого используете ExecutorService::submit(Callable<T>) для отправки заданий, вы получаете Future<T>, который внутренне сохраняет ссылку на работающий поток, как только Callable::call() начинает выполнение. Этот поток может быть прерван с помощью вызова Future::cancel(true). Любой код внутри (или вызываемый) Callable, который проверяет состояние прерывания текущего потока, может поэтому быть прерван через ссылку Future. Это включает BlockingQueue::take(), который, даже если он заблокирован, будет реагировать на прерывание потока. (Методы блокировки JRE обычно просыпаются, если их прерывают, когда блокируются, понимают, что они были прерваны, и выдают InterruptedException.)

Подводя итог: Future::cancel() и Future::cancel(true) оба отменяют будущую работу , в то время как Future::cancel(true) также прерывает текущую работу (до тех пор, пока текущая работа реагирует на прерывание потока) , Ни один из двух вызовов cancel не влияет на работу, которая уже успешно завершена.

Обратите внимание, что после прерывания потока из-за отмены в поток будет добавлено InterruptException (например, BlockingQueue::take() в этом случае). Тем не менее, вы CancellationException будете отброшены в главном потоке при следующем вызове Future::get() на успешно отмененном Future (то есть на Future, который был отменен до его завершения). Это отличается от того, что вы обычно ожидаете: если неотмененный Callable выбрасывает InterruptedException, следующий вызов Future::get() будет выбрасывать InterruptedException, но если отмененный Callable выбрасывает InterruptedException, следующий позвонить на Future::get() через CancellationException.

Вот пример, который иллюстрирует это:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

public class Test {
    public static void main(String[] args) throws Exception {
        // Start Executor with 4 threads
        int numThreads = 4;
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
        try {
            // Set up BlockingQueue for inputs, and List<Future> for outputs
            BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
            List<Future<String>> futures = new ArrayList<>(numThreads);
            for (int i = 0; i < numThreads; i++) {
                int threadIdx = i;
                futures.add(executor.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        try {
                            // Get an input from the queue (blocking)
                            int val = queue.take();
                            return "Thread " + threadIdx + " got value " + val;
                        } catch (InterruptedException e) {
                            // Thrown once Future::cancel(true) is called
                            System.out.println("Thread " + threadIdx + " got interrupted");
                            // This value is returned to the Future, but can never
                            // be read, since the caller will get a CancellationException
                            return "Thread " + threadIdx + " got no value";
                        }
                    }
                }));
            }

            // Enqueue (numThreads - 1) values into the queue, so that one thread blocks
            for (int i = 0; i < numThreads - 1; i++) {
                queue.add(100 + i);
            }

            // Cancel all futures
            for (int i = 0; i < futures.size(); i++) {
                Future<String> future = futures.get(i);
                // Cancel the Future -- this doesn't throw an exception until
                // the get() method is called
                future.cancel(/* mayInterruptIfRunning = */ true);
                try {
                    System.out.println(future.get());
                } catch (CancellationException e) {
                    System.out.println("Future " + i + " was cancelled");
                }
            }
        } finally {
            // Terminate main after all threads have shut down (this call does not block,
            // so main will exit before the threads stop running)
            executor.shutdown();
        }
    }
}

Каждый раз, когда вы запускаете это, вывод будет отличаться, но вот один прогон:

Future 1 was cancelled
Future 0 was cancelled
Thread 2 got value 100
Thread 3 got value 101
Thread 1 got interrupted

Это показывает, что поток 2 и поток 3 завершены до вызова Future::cancel(). Поток 1 был отменен, поэтому внутренне InterruptedException было брошено, а внешне CancellationException было брошено. Поток 0 был отменен, прежде чем он начал работать. (Обратите внимание, что индексы потоков в общем случае не коррелируют с индексами Future, поэтому Future 0 was cancelled может соответствовать либо отмене потока 0, либо потока 1, и то же самое для Future 1 was cancelled.)

Дополнительно: Один из способов достижения того же эффекта с помощью Executor::execute (который не возвращает ссылку Future) вместо Executor::submit состоит в создании ThreadPoolExecutor с пользовательским ThreadFactory, и пусть ваша ThreadFactory записывает ссылку в параллельную коллекцию (например, параллельную очередь) для каждого созданного потока. Затем, чтобы отменить все потоки, вы можете просто вызвать Thread::interrupt() во всех ранее созданных потоках. Однако вам нужно будет иметь дело с условием гонки, что новые потоки могут быть созданы, пока вы прерываете существующие потоки. Чтобы справиться с этим, установите флаг AtomicBoolean, видимый для ThreadFactory, который говорит ему не создавать больше потоков, затем, как только это будет установлено, отмените существующие потоки.

0 голосов
/ 24 декабря 2009

Пакет java.concurrency.utils был разработан и реализован одними из лучших разработчиков параллельного программирования. Кроме того, прерывание потоков как средство их прекращения явно подтверждается их книгой «Параллелизм Java на практике». Поэтому я был бы крайне удивлен, если какие-либо элементы остались в очереди из-за прерывания.

...