Допустим, у меня есть очередь под названием уведомления, и я отправляю 1000 сообщений в эту очередь. У меня также есть 20 потребителей этой очереди. Я бы хотел, чтобы каждый из этих потребителей взял одно сообщение из очереди, обработал его, а затем получил следующее доступное сообщение.
То, что происходит сейчас, заключается в том, что некоторые потребители получают много сообщений и обрабатывают их, в то время как другие потребители ничего не делают.
Ниже приведен полный тестовый пример, который демонстрирует то, что я вижу. В действительности все потребители - это отдельные процессы на отдельных машинах, но это точно повторяет поведение:
using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Spring.Messaging.Nms.Core;
namespace QueueTest
{
class Program
{
private static object _syncObj = new object();
private static int _row;
static void Main()
{
var connectionFactory = new ConnectionFactory("tcp://localhost:61616");
var template = new NmsTemplate(connectionFactory);
for (int i = 0; i < 500; i++)
{
template.ConvertAndSend("notifications", "hello");
}
for (int i = 0; i < 10; i++)
{
ThreadPool.QueueUserWorkItem(Spawn, i);
}
Console.ReadKey();
}
static void Spawn(object o)
{
int count = 0;
int threadId = (int)o;
var ts = new TimeSpan(0,0,0,5);
while (true)
{
var connectionFactory = new ConnectionFactory(
"tcp://localhost:61616"
+ "?nms.PrefetchPolicy.QueuePrefetch=1",
String.Format("{0}:{1}:{2}",
Environment.MachineName,
threadId,
"notifications"));
using (var conn = connectionFactory.CreateConnection())
{
conn.ClientId = String.Format("{0}:{1}:{2}", Environment.MachineName, threadId, "notifications");
conn.Start();
using (var session = conn.CreateSession())
{
var queue = session.GetQueue("notifications");
using (var consumer = session.CreateConsumer(queue))
{
IMessage msg = consumer.Receive(ts);
while (msg != null)
{
lock (_syncObj)
{
Interlocked.Increment(ref _row);
Console.WriteLine("{0}. {1} processed {2}", _row, threadId, ++count);
if (_row == 500)
{
Environment.Exit(0);
}
}
Thread.Sleep(1000);
msg = consumer.Receive(ts);
}
Console.WriteLine("{0} nothing to process", threadId);
}
}
conn.Stop();
}
}
}
}
}
Как мне сделать так, чтобы сообщения распределялись среди всех потребителей более равномерно?