Лучший способ отказаться от добавления в очередь после остановки менеджера - PullRequest
0 голосов
/ 19 ноября 2011

Есть идеи о том, как лучше заблокировать все добавления, когда был вызван стоп?

Кроме того, это первый раз, когда я использовал LockSupport, это лучший способ справиться с блокировкой в ​​очереди во время ожидания новой очереди?

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;

/**
 *
 * @author Marc D. Benstein
 */
public abstract class AbstractWorkerQueue<T> implements AutoCloseable {
    private final List<T> queue;
    private final Thread thread;

    public AbstractWorkerQueue(String threadName) {
        queue = Collections.synchronizedList(new LinkedList<T>());
        thread = new Thread(new QueueManager(), threadName);
    }

    public void offer(T item) {
        synchronized (queue) {
            queue.add(item);
        }
        LockSupport.unpark(thread);
    }

    public void clear() {
        synchronized (queue) {
            queue.clear();
        }
    }

    @Override
    public void close() {
        stop();
    }

    protected void start() {
        thread.start();
    }

    protected void stop() {
        offer(null);
        try {
            thread.join();
        } catch (InterruptedException ex) {
        }
    }

    protected abstract void process(T item);

    private class QueueManager implements Runnable {

        @Override
        public void run() {
            while (true) {
                T item = null;
                synchronized (queue) {
                    if (!(queue.isEmpty())) {
                        item = queue.remove(0);
                        if (item == null) {
                            // null dentotes end of queue
                            return;
                        }
                    }
                }
                if (item != null) {
                    process(item);
                } else {
                    LockSupport.park(thread);
                }
            }
        }
    }
}

Ответы [ 5 ]

1 голос
/ 19 ноября 2011

first не перехватывать молча InterruptedException по крайней мере сбросить флаг с Thread.currentThread().interrupt(); в этом перехвате

second существуют реализации потока, ориентированные на поток, которые лучше настроить, проверьте java.util.concurrent пакет

и существует вероятность того, что поток может добавить T и вызвать unpark() до того, как QueueManager достигнет park() сразу после удаления последнего,вам лучше wait в очереди и notify в очереди внутри синхронизированного блока

0 голосов
/ 20 ноября 2011

Упрощенная версия вашего кода.


Примечание предложение теперь возвращает логическое значение, как стандарт java.util.Queue.Я предпочитаю Throwable для флага закрытия, хотя здесь я ничего не использую, для отладки очень полезно записывать момент вызова close ().Кроме этого удален LinkedList и синхронизирован.

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.LockSupport;

public abstract class AbstractWorkerQueue<T> {
    private final ConcurrentLinkedQueue<T> queue;
    private final Thread thread;

    private volatile RuntimeException closed;//records the closure point

    public AbstractWorkerQueue(String threadName) {
        queue = new ConcurrentLinkedQueue<T>();
        thread = new Thread(new QueueManager(), threadName);
    }

    public boolean offer(T item) {
        if (closed!=null)
            return false;

        queue.offer(item);      
        LockSupport.unpark(thread);
        return true;
    }

    public void clear() {
        queue.clear();//this may not be carried properly
    }

    public void close() {
        stop();
    }

    protected void start() {
        thread.start();
    }

    protected void stop() {
        if (closed!=null)
            return;
        closed = new IllegalStateException("closure point");//should be CAS'd but meh
        LockSupport.unpark(thread);
        try {
            thread.join();
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    protected abstract void process(T item);

    private class QueueManager implements Runnable {
        @Override
        public void run() {
            while (closed==null) {
                final T item = queue.poll();
                if (item != null) {
                    process(item);
                } else {
                    LockSupport.park(thread);
                }
            }
        }
    }
}
0 голосов
/ 19 ноября 2011

Я не могу проверить это прямо сейчас, но как насчет:

volatile boolean closed = false;
public void offer(T item) {
    if ( !closed ) {
        synchronized (queue) {
            queue.add(item);
            if ( item == null ) {
                closed = true; 
            }
        }
        // Not sure if this should be outside or inside the if.
        LockSupport.unpark(thread);
    }
}

Добавлено : Я думаю, что единственным побочным эффектом состояния гонки здесь будет возможностьнебольшое количество дополнительных элементов, отправляемых в очередь после нуля.До тех пор, пока вы обращаетесь с этим (что вам кажется) у вашего потребителя, проблем не должно быть.Однако, если это было проблемой, вы могли бы сделать:

volatile boolean closed = false;
public void offer(T item) {
    if ( !closed ) {
        synchronized (queue) {
            if ( !closed ) {
                queue.add(item);
                if ( item == null ) {
                    closed = true; 
                }
            }
        }
        // Not sure if this should be outside or inside the if.
        LockSupport.unpark(thread);
    }
}
0 голосов
/ 19 ноября 2011

Добавить новую переменную переменной экземпляра isStopped в класс AbstractWorkerQueue со значением по умолчанию, равным false. Включите флаг в true после вызова метода stop.

Вы можете генерировать IllegalStateException при любых последующих запросах offer () или возвращать, не добавляя его в очередь.

0 голосов
/ 19 ноября 2011

Бросьте IllegalStateException, возможно, подкласс этого.

...