Тематическая параллельная очередь в простой Java - PullRequest
1 голос
/ 19 мая 2011

Прежде, чем я заново изобрету колесо, существует ли параллельная очередь по теме в простой Java? У меня есть следующие требования:

  • Несколько читателей / потребителей
  • Несколько авторов / продюсеров
  • Каждое сообщение должно потребляться каждым (активным) потребителем
  • После того, как каждый потребитель прочитает сообщение, оно должно стать мусором (то есть больше нет ссылок)
  • Запись в очередь не должна быть O (N) для количества потребителей
  • Параллельно, предпочтительно неблокирующе
  • Не на основе JMS: это для более легкой / встраиваемой среды

Это почти все, что мне нужно. Есть указатели?

Ответы [ 4 ]

3 голосов
/ 19 мая 2011

По сути, вы говорите о мультиплексировании, и нет, в стандартной библиотеке чего-то нет, но создать его довольно просто.Предполагая, что ваши клиенты не заинтересованы в сообщениях, опубликованных до их подписки, вам нужен пул очередей для каждого потребителя, и публикация просто предлагает элемент для каждой очереди:

public class Multiplexer<M> {
  private final List<BlockingQueue<M>> consumers 
    = new CopyOnWriteArrayList<BlockingQueue<M>>();

  public void publish(M msg) {
    for (BlockingQueue<M> q : consumers) {
      q.offer(msg);
    }
  }

  public void addConsumer(BlockingQueue<M> consumer) {
    consumers.add(consumer);
  }
}

Эта версия позволяет потребителям использовать любыереализация блокировки очереди, которую они могли бы хотеть.Очевидно, вы могли бы предоставить стандартную реализацию и хороший интерфейс для клиента, если хотите.

0 голосов
/ 19 мая 2011

Иметь только одного псевдо-потребителя и позволить реальным потребителям зарегистрироваться у псевдо-потребителя. Когда производитель отправляет сообщение, псевдо-потребитель просыпается и потребляет сообщение. При использовании сообщения псевдо-потребитель создает отдельный Runnable для каждого зарегистрированного реального потребителя и выполняет их в пуле потоков.

0 голосов
/ 19 мая 2011

Самая простая стратегия состоит в том, чтобы передать сообщение каждому потребителю, у меня не было бы так много потребителей, что важно число потребителей. Вы можете добавить сообщения десяткам потребителей за несколько микросекунд.

Один из способов избежать этого - иметь кольцевой буфер с большим количеством считывателей. Это сложно реализовать и означает, что потребители будут ограничены в количестве источников сообщений, которые они могут иметь.

0 голосов
/ 19 мая 2011

3-е условие не в простой java, но вы можете использовать неблокирующую связанную очередь с отдельным заголовком для каждого потребителя (вы можете полагаться на GC для сбора узлов без ссылок)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...