Я работаю над приложением .NET, взаимодействующим со службой ActiveMQ, и пытаюсь достичь самого простого:
- Производитель публикует сообщение в ActiveMQ.
- Абонент подключается к ActiveMQ и получает сообщение.
- Если подписчику не удается обработать сообщение (если выдается исключение), ActiveMQ снова отправляет сообщение подписчику, пока не будет достигнуто
MaxRedeliveries
.
- Если после определенного числа попыток сообщение все еще не обработано, ActiveMQ помещает его в другую очередь, содержащую недоставленные сообщения (из того, что я понял до сих пор, это обычно называют очередью мертвых писем, DLQ).
Я был почти уверен, что ActiveMQ должен вести себя «из коробки», но у меня это просто не работает. Основываясь на коде этой (очень старой) статьи , я придумал этот пример:
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Spring.Messaging.Nms.Core;
using Spring.Messaging.Nms.Listener;
using System;
namespace MqListener
{
class Program
{
private const string URI = "tcp://localhost:61616";
private const string DESTINATION = "test.queue";
static void Main(string[] args)
{
Publish();
Subscribe();
}
private static void Publish()
{
var connectionFactory = new ConnectionFactory(URI);
var template = new NmsTemplate(connectionFactory);
template.ConvertAndSend(DESTINATION, "Hello from the sender.");
}
private static void Subscribe()
{
try
{
var connectionFactory = new ConnectionFactory(URI);
connectionFactory.RedeliveryPolicy.MaximumRedeliveries = 10;
using (var listenerContainer = new SimpleMessageListenerContainer())
{
listenerContainer.ConnectionFactory = connectionFactory;
listenerContainer.DestinationName = DESTINATION;
listenerContainer.MessageListener = new Listener();
listenerContainer.AfterPropertiesSet();
Console.WriteLine("Listener started.");
Console.WriteLine("Press <ENTER> to exit.");
Console.Read();
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Console.WriteLine("Press <ENTER> to exit.");
Console.Read();
}
}
}
internal class Listener : IMessageListener
{
public Listener()
{
Console.WriteLine("Listener created");
}
public void OnMessage(IMessage message)
{
var textMessage = (Apache.NMS.ActiveMQ.Commands.ActiveMQTextMessage)message;
Console.WriteLine($"{textMessage.NMSXDeliveryCount} {message.NMSRedelivered} {textMessage.Text}");
throw new Exception(); // I want to emulate the failure during the processing
}
}
}
С помощью этого кода сообщение успешно отправляется, а затем передается в прослушиватель, но независимо от того, что я делаю, ActiveMQ обрабатывает его, как если бы оно было успешно доставлено. Однако несколько раз мне удавалось заставить его повторно доставлять одно и то же сообщение дважды (первый раз с message.NMSRedelivered = false
и второй раз с message.NMSRedelivered = true
), но не более. И что хуже всего, я даже не знаю, какие «твики» имели этот эффект.
Итак, вопрос в том, не понимаю ли я всю концепцию ActiveMQ или я пропускаю что-то очень простое?
Буду признателен за любую помощь в этом отношении!