activemessaging с stomp и activemq.prefetchSize = 1 - PullRequest
0 голосов
/ 04 июня 2010

У меня есть ситуация, когда у меня есть один брокер activemq с 2 очередями, Q1 и Q2. У меня есть два потребителя на основе рубина, использующих активное сообщение. Давайте назовем их C1 и C2. Оба потребителя подписываются на каждую очередь. Я устанавливаю activemq.prefetchSize = 1 при подписке на каждую очередь. Я также устанавливаю ack = client.

Рассмотрим следующую последовательность событий:

1) Сообщение, запускающее длительное задание, публикуется в очередь Q1. Назовите это M1.

2) M1 отправляется потребителю C1, начиная длительную операцию.

3) Два сообщения, запускающие короткие задания, публикуются в очередь Q2. Назовите эти M2 и M3.

4) M2 отправляется на C2, который быстро выполняет короткое задание.

5) M3 отправляется на C1, хотя C1 все еще работает на M1. Он может отправлять в C1, потому что prefetchSize = 1 устанавливается для подписки в очереди, а не для соединения. Поэтому тот факт, что сообщение Q1 уже отправлено, не останавливает отправку одного сообщения Q2.

Поскольку потребители активных сообщений являются однопоточными, общий результат заключается в том, что M3 долго сидит и ждет на C1, пока C1 не завершит обработку M1. Таким образом, M3 долгое время не обрабатывается, несмотря на то, что потребитель C2 бездействует (поскольку он быстро завершает работу с сообщением M2).

По сути, всякий раз, когда выполняется длинное задание Q1, а затем создается целая куча коротких заданий Q2, ровно одно из коротких заданий Q2 застревает у потребителя, ожидающего завершения длинного задания Q1.

Есть ли способ установить prefetchSize на уровне соединения, а не на уровне подписки? Я действительно не хочу, чтобы какие-либо сообщения отправлялись в C1, пока он обрабатывает M1. Другой альтернативой является то, что я мог бы создать потребителя, выделенного для обработки Q1, а затем пригласить других потребителей, выделенных для обработки Q2. Но я бы предпочел этого не делать, поскольку сообщения Q1 нечасты - выделенные потребители Q1 большую часть дня бездействуют, связывая память.

Ответы [ 2 ]

2 голосов
/ 04 июня 2010

activemq.prefetchSize доступен только для сообщения SUBSCRIBE, но не CONNECT, в соответствии с документами ActiveMQ для их расширенных заголовков stomp (http://activemq.apache.org/stomp.html). Вот соответствующая информация:

глагол: ПОДПИСАТЬСЯ

header: activemq.prefetchSize

тип: int

описание: указывает максимальное количество ожидающих сообщений, которые будут быть отправленным клиенту. Однажды это максимум достигнут не больше сообщений отправляются до клиента подтверждает сообщение. Установите 1 для очень честное распространение сообщений через потребителей, где обработка сообщения могут быть медленными.

Мое чтение и опыт работы с этим состоят в том, что, поскольку M1 не был подтвержден (b / c, у вас включено подтверждение клиента), этот M1 должен быть сообщением 1, разрешенным prefetchSize = 1, установленным в подписке. Я удивлен, узнав, что это не сработало, но, возможно, мне нужно провести более подробный тест. Ваши настройки должны соответствовать желаемому поведению.

Я слышал от других о бреде о рассылке activemq, поэтому возможно, что это ошибка используемой версии.

Одно из предложений, которое я хотел бы сделать, это либо прослушать сетевой трафик, чтобы узнать, получает ли M1 по какой-то причине подтверждение, либо сгенерировать некоторые операторы put в гем ruby ​​stomp, чтобы посмотреть сообщение (на этом я обычно заканчиваю делать при отладке проблем с топами).

Если у меня будет возможность попробовать это, я дополню свой комментарий своими собственными результатами.

Одно предложение: вполне возможно, что несколько длинных обрабатывающих сообщений могут быть отправлены, и если число длинных обрабатывающих сообщений превышает ваше количество процессов, вы будете в этом исправлении, где ожидают сообщения быстрой обработки.

У меня, как правило, есть хотя бы один выделенный процесс, который просто выполняет быструю работу, или, другими словами, выделяет набор # процессов, которые просто выполняют более длительные задания. Если все потребительские процессы опроса слушают как длинные, так и короткие, это может привести к неоптимальным результатам независимо от того, что делает диспетчеризация. Группы процессов - это способ настроить потребителя на прослушивание поднабора адресатов: http://code.google.com/p/activemessaging/wiki/Configuration

имя группы процессоров, * List_of_processors

A processor group is a way to run the poller to only execute a subset of

процессоры, передавая имя группа в командной строке poller аргументы.

You specify the name of the processor as its underscored lowercase

версия. Так что если у вас есть FooBarProcessor и BarFooProcessor в группа процессоров, это будет выглядеть это:

    ActiveMessaging::Gateway.define do |s|
      ...
      s.processor_group :my_group, :foo_bar_processor, :bar_foo_processor
    end

The processor group is passed into the poller like the following:

    ./script/poller start -- process-group=my_group
0 голосов
/ 04 июня 2010

Я не уверен, поддерживает ли ActiveMessaging это, но вы можете отменить подписку других ваших потребителей, когда придет длинное сообщение об обработке, а затем повторно подписать их после обработки.

Это должно дать вам желаемый эффект.

...