Как прервать BlockingQueue, который блокирует на take ()? - PullRequest
78 голосов
/ 01 мая 2009

У меня есть класс, который берет объекты из BlockingQueue и обрабатывает их, вызывая take() в непрерывном цикле. В какой-то момент я знаю, что больше объектов не будет добавлено в очередь. Как мне прервать метод take(), чтобы он прекратил блокировку?

Вот класс, который обрабатывает объекты:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

А вот метод, который использует этот класс для обработки объектов:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}

Ответы [ 5 ]

64 голосов
/ 01 мая 2009

Если прерывание потока не является опцией, другой способ - поместить объект «маркер» или «команда» в очередь, который будет распознаваться как таковой MyObjHandler, и выходить из цикла.

13 голосов
/ 26 декабря 2013

Очень поздно, но надеюсь, что это поможет и другим, поскольку Я столкнулся с подобной проблемой и использовал подход poll, предложенный erickson выше с некоторыми незначительными изменениями,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

Это решило обе проблемы

  1. Отметил BlockingQueue, чтобы он знал, что ему не нужно больше ждать элементов
  2. Не прервано между ними, поэтому обработка блоков завершается только тогда, когда обрабатываются все элементы в очереди и нет элементов, которые необходимо добавить
13 голосов
/ 01 мая 2009
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

Однако, если вы сделаете это, поток может быть прерван, пока в очереди все еще есть элементы, ожидающие обработки. Возможно, вы захотите использовать poll вместо take, что позволит истечь тайм-аут обработки и завершиться, когда он ждет некоторое время без ввода нового значения.

1 голос
/ 01 мая 2009

Прервать поток:

thread.interrupt()
0 голосов
/ 13 сентября 2012

Или не перебивай, это противно.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. когда производитель помещает объект в очередь, вызывать queue.notify(), если он заканчивается, вызывать queue.done()
  2. цикл while (! Queue.isDone () ||! Queue.isEmpty ())
  3. test take () возвращаемое значение для нуля
...