Qpid - сообщения балансировки нагрузки для потребителей, которые имеют различные характеристики производительности - PullRequest
2 голосов
/ 07 марта 2011

У меня есть следующий сценарий:

Я помещаю 100 сообщений в очередь, совместно используемую двумя потребителями.Оба подписчика подписываются на очередь в режиме предварительного захвата и явном режиме.После обработки каждого сообщения каждый подписчик принимает сообщение, чтобы удалить его из очереди.Псевдокоды выглядят так:

OnMessageTransfer(message) :
    DoSomethingWithMessage(message)
    Session.MessageAccept(message)

Сообщения сбалансированы по нагрузке правильно, каждое сообщение обрабатывается один и только один раз, но мы обнаружили, что оно не учитывает время обработки для каждого потребителя.Например, предположим, что потребителю A требуется 50 мс для обработки сообщения, а потребителю B - 5 секунд.В идеале потребитель B должен начать обработку 1 сообщения, а потребитель A должен обработать 99 других.Однако происходит то, что потребитель B фактически обработает 25 сообщений за 50 секунд, в то время как потребитель A обработает остальные 75 через ~ 4 секунды и простаивает.Клиентский API, кажется, предварительно выбирает сообщения, что явно не оптимально в этой ситуации.

Как мы можем решить эту проблему?

Мы используем Qpid cpp 0.5 и полностью управляемый c # 0-10 клиентский API, а не привязки cpp (но, насколько я понимаю, это поведение не связано с реализацией API)

С уважением,

Julien

1 Ответ

2 голосов
/ 08 марта 2011

Это поведение можно изменить, настроив поток сообщений для очереди. Чтобы избежать предварительной выборки сообщений, необходимо установить кредит в 1 сообщение и обновлять после обработки каждого сообщения. Псевдокод будет выглядеть так:

InitSubscription(queue) : 
    MessageSubscribe(queue, AcceptMode.Explicit, AcquireMode.PreAcquired)
    MessageSetFlowMode(queue, FlowMode.Credit)
    MessageFlow(queue, CreditUnit.Byte, MAX_BYTES)
    MessageFlow(queue, CreditUit.Message, 1) // will disable prefetch

OnMessageTransfer(message) :
    DoSomethingWithMessage(message)
    MessageAccept(message)
    MessageFlow(queue, CreditUit.Message, 1) // reissue a credit for 1 and only 1 message
...