ActiveMQ - потребители не разделяют нагрузку - PullRequest
0 голосов
/ 01 апреля 2011

Допустим, у меня есть очередь под названием уведомления, и я отправляю 1000 сообщений в эту очередь. У меня также есть 20 потребителей этой очереди. Я бы хотел, чтобы каждый из этих потребителей взял одно сообщение из очереди, обработал его, а затем получил следующее доступное сообщение.

То, что происходит сейчас, заключается в том, что некоторые потребители получают много сообщений и обрабатывают их, в то время как другие потребители ничего не делают.

Ниже приведен полный тестовый пример, который демонстрирует то, что я вижу. В действительности все потребители - это отдельные процессы на отдельных машинах, но это точно повторяет поведение:

using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Spring.Messaging.Nms.Core;

namespace QueueTest
{
    class Program
    {
        private static object _syncObj = new object();
        private static int _row;

        static void Main()
        {
            var connectionFactory = new ConnectionFactory("tcp://localhost:61616");
            var template = new NmsTemplate(connectionFactory);

            for (int i = 0; i < 500; i++)
            {
                template.ConvertAndSend("notifications", "hello");
            }

            for (int i = 0; i < 10; i++)
            {
                ThreadPool.QueueUserWorkItem(Spawn, i);
            }

            Console.ReadKey();
        }

        static void Spawn(object o)
        {
            int count = 0;
            int threadId = (int)o;

            var ts = new TimeSpan(0,0,0,5);
            while (true)
            {
                var connectionFactory = new ConnectionFactory(
                                "tcp://localhost:61616"
                                + "?nms.PrefetchPolicy.QueuePrefetch=1",
                                String.Format("{0}:{1}:{2}", 
                Environment.MachineName, 
                threadId, 
                "notifications"));
                using (var conn = connectionFactory.CreateConnection())
                {
                    conn.ClientId = String.Format("{0}:{1}:{2}", Environment.MachineName, threadId, "notifications");
                    conn.Start();

                    using (var session = conn.CreateSession())
                    {
                        var queue = session.GetQueue("notifications");
                        using (var consumer = session.CreateConsumer(queue))
                        {
                            IMessage msg = consumer.Receive(ts);
                            while (msg != null)
                            {
                                lock (_syncObj)
                                {
                                    Interlocked.Increment(ref _row);
                                    Console.WriteLine("{0}.  {1} processed {2}", _row, threadId, ++count);
                                    if (_row == 500)
                                    {
                                        Environment.Exit(0);
                                    }
                                }
                                Thread.Sleep(1000);
                                msg = consumer.Receive(ts);
                            }
                            Console.WriteLine("{0} nothing to process", threadId);
                        }
                    }
                    conn.Stop();
                }
            }
        }
    }
}

Как мне сделать так, чтобы сообщения распределялись среди всех потребителей более равномерно?

Ответы [ 2 ]

1 голос
/ 03 апреля 2011

Проблема связана с буфером предварительной выборки потребителей. По умолчанию потребитель предварительно выберет около 1000 сообщений от брокера. Вам необходимо установить предварительную выборку для потребителей, чтобы они позволяли другим потребителям получать некоторые сообщения, в вашем случае вы можете установить предварительную выборку, равную единице, чтобы каждый потребитель распределял нагрузку равномерно. Взгляните на политику предварительной выборки в NMS.ActiveMQ API.

Вы можете установить предварительную выборку для URI подключения, это будет выглядеть примерно так: tcp: // localhost: 61616? Nms.PrefetchPolicy.QueuePrefetch = 1

0 голосов
/ 06 апреля 2011

Сразу после настройки щедрости я выяснил проблему ... :-(

Я использовал более старую версию NMS, которая не поддерживала политику предварительной выборки.

...