Несколько очередей блокировки, один потребитель - PullRequest
18 голосов
/ 06 марта 2012

У меня есть несколько BlockingQueues, содержащих сообщения для отправки.Можно ли иметь меньше потребителей, чем очередей?Я не хочу перебирать очереди и продолжать опрашивать их (занят ожиданием), и мне не нужен поток для каждой очереди.Вместо этого я хотел бы иметь один поток, который пробуждается, когда сообщение доступно в любой из очередей.

Ответы [ 2 ]

6 голосов
/ 02 июня 2015

LinkedBlockingMultiQueue делает то, что вы просите. Это не позволяет потребителю блокировать произвольные BlockingQueues, но можно создавать «подпоследовательности» из одной «многопоследовательности» и достигать того же эффекта. Производители предлагают в под-очередях, и потребители могут блокировать себя, опрашивая единственную множественную очередь, ожидая любого элемента.

Он также поддерживает приоритеты, то есть отбирает элементы из одних очередей перед рассмотрением других.

Пример:

LinkedBlockingMultiQueue<Int, String> q = new LinkedBlockingMultiQueue<>();
q.addSubQueue(1 /* key */, 10 /* priority */);
q.addSubQueue(2 /* key */, 10 /* priority */);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq1 = q.getSubQueue(1);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq2 = q.getSubQueue(2);

Тогда вы можете предложить и опросить:

sq1.offer("x1");
q.poll(); // "x1"
sq2.offer("x2");
q.poll(); // "x2"

Отказ от ответственности: я являюсь автором библиотеки.

6 голосов
/ 14 марта 2012

Один трюк, который вы могли бы сделать, - это иметь очередь из очередей. Поэтому вы должны иметь одну очередь блокировки, на которую подписываются все потоки. Затем, когда вы помещаете что-то в одну из ваших блокирующих очередей, вы также ставите свою очередь на блокировку в этой единственной очереди. Так что у вас будет что-то вроде:

BlockingQueue<WorkItem> producers[] = new BlockingQueue<WorkItem>[NUM_PRODUCERS];
BlockingQueue<BlockingQueue<WorkItem>> producerProducer = new BlockingQueue<BlockingQueue<WorkItem>>();

Тогда, когда вы получите новый рабочий элемент:

void addWorkItem(int queueIndex, WorkItem workItem) {
    assert queueIndex >= 0 && queueIndex < NUM_PRODUCERS : "Pick a valid number";
    //Note: You may want to make the two operations a single atomic operation
    producers[queueIndex].add(workItem);
    producerProducer.add(producers[queueIndex]);
}

Теперь ваши потребители могут все блокировать от производителяПроизводитель. Я не уверен, насколько ценна эта стратегия, но она выполняет то, что вы хотите.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...