Сделать ActiveMQ повторное получение сообщения при сбое - PullRequest
0 голосов
/ 27 мая 2019

Я работаю над приложением .NET, взаимодействующим со службой ActiveMQ, и пытаюсь достичь самого простого:

  1. Производитель публикует сообщение в ActiveMQ.
  2. Абонент подключается к ActiveMQ и получает сообщение.
  3. Если подписчику не удается обработать сообщение (если выдается исключение), ActiveMQ снова отправляет сообщение подписчику, пока не будет достигнуто MaxRedeliveries.
  4. Если после определенного числа попыток сообщение все еще не обработано, ActiveMQ помещает его в другую очередь, содержащую недоставленные сообщения (из того, что я понял до сих пор, это обычно называют очередью мертвых писем, DLQ).

Я был почти уверен, что ActiveMQ должен вести себя «из коробки», но у меня это просто не работает. Основываясь на коде этой (очень старой) статьи , я придумал этот пример:

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Spring.Messaging.Nms.Core;
using Spring.Messaging.Nms.Listener;
using System;

namespace MqListener
{
    class Program
    {
        private const string URI = "tcp://localhost:61616";
        private const string DESTINATION = "test.queue";


        static void Main(string[] args)
        {
            Publish();
            Subscribe();
        }

        private static void Publish()
        {
            var connectionFactory = new ConnectionFactory(URI);
            var template = new NmsTemplate(connectionFactory);
            template.ConvertAndSend(DESTINATION, "Hello from the sender.");
        }

        private static void Subscribe()
        {
            try
            {
                var connectionFactory = new ConnectionFactory(URI);
                connectionFactory.RedeliveryPolicy.MaximumRedeliveries = 10;

                using (var listenerContainer = new SimpleMessageListenerContainer())
                {
                    listenerContainer.ConnectionFactory = connectionFactory;
                    listenerContainer.DestinationName = DESTINATION;
                    listenerContainer.MessageListener = new Listener();
                    listenerContainer.AfterPropertiesSet();

                    Console.WriteLine("Listener started.");
                    Console.WriteLine("Press <ENTER> to exit.");
                    Console.Read();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
                Console.WriteLine("Press <ENTER> to exit.");
                Console.Read();
            }
        }
    }

    internal class Listener : IMessageListener
    {
        public Listener()
        {
            Console.WriteLine("Listener created");
        }

        public void OnMessage(IMessage message)
        {
            var textMessage = (Apache.NMS.ActiveMQ.Commands.ActiveMQTextMessage)message;
            Console.WriteLine($"{textMessage.NMSXDeliveryCount} {message.NMSRedelivered} {textMessage.Text}");

            throw new Exception(); // I want to emulate the failure during the processing
        }
    }
}

С помощью этого кода сообщение успешно отправляется, а затем передается в прослушиватель, но независимо от того, что я делаю, ActiveMQ обрабатывает его, как если бы оно было успешно доставлено. Однако несколько раз мне удавалось заставить его повторно доставлять одно и то же сообщение дважды (первый раз с message.NMSRedelivered = false и второй раз с message.NMSRedelivered = true), но не более. И что хуже всего, я даже не знаю, какие «твики» имели этот эффект.

Итак, вопрос в том, не понимаю ли я всю концепцию ActiveMQ или я пропускаю что-то очень простое? Буду признателен за любую помощь в этом отношении!

...