Отправка ответа во временную очередь ActiveMQ с использованием только имени ReplyTo - PullRequest
0 голосов
/ 07 сентября 2011

В последнее время я пытался заставить шаблон Reply-To работать в Apache NMS / ActiveMQ, и у меня были проблемы с отправкой сообщений во временные очереди, используя только имя временной очереди.

project - диспетчерская служба, которая получает запросы от шины и отправляет их другому процессу / среде выполнения (на основе сложных критериев маршрутизации) для обработки запроса.Затем этот отдельный процессор использует имя очереди ответа и идентификатор корреляции для создания ответа и отправки его исходному запрашивающему на том же посреднике, но на другом соединении.

Проблема заключается в том, что кажется, что вы можете отправлять тольково временную очередь (или тему), если у вас есть ссылка на объект IDestination из заголовка сообщения NMSReplyTo.Если эта ссылка потеряна, невозможно отправить сообщения во временную очередь (или тему), просто используя ее имя.

Иллюстрацией этой проблемы является эта простая служба «Понг», которая прослушивает очередь сообщений ивыдает запрос запрашивающей стороне, используя содержимое заголовка Reply-To NMS.Он имитирует отправку запроса другому процессу, просто вызывая метод ProcessMessage (string, string).

    using System;
    using Apache.NMS;

    namespace PongService
    {
        /// <summary>Simple request dispatcher which mimics dispatching requests to other workers in "The Cloud"</summary>
        class PongService
        {
            static ISession session = null;
            static IMessageProducer producer = null;

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("Connecting to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);
                IConnection connection = factory.CreateConnection();
                session = connection.CreateSession();

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Using destination: " + destination);

                producer = session.CreateProducer(null);

                IMessageConsumer consumer = session.CreateConsumer(destination);

                connection.Start();

                consumer.Listener += new MessageListener(OnMessage);

                Console.WriteLine("Press any key to terminate Pong service . . .");

                // loop until a key is pressed
                while (!Console.KeyAvailable)
                {
                    try { System.Threading.Thread.Sleep(50); }
                    catch (Exception ex) { Console.Error.WriteLine(ex.Message + "\r\n" + ex.StackTrace); }
                } // loop

                Console.Write("Closing connection...");
                consumer.Close();
                producer.Close();
                session.Close();
                connection.Close();
                Console.WriteLine("done.");
            }


            /// <summary>Consumer call-back which receives requests and dispatches them to available workers in 'The Cloud'</summary>
            /// <param name="receivedMsg">The message received on the request queue.</param>
            protected static void OnMessage(IMessage receivedMsg)
            {
                // mimic the operation of passing this request to an external processor which can connect 
                // to the broker but will not have references to the session objects including destinations
                Console.WriteLine("Sending request to an external processor");
                ProcessMessage(receivedMsg.NMSReplyTo.ToString(), receivedMsg.NMSCorrelationID.ToString());
            }


            /// <summary>Models a worker in another process/runtime.</summary>
            /// <param name="queuename">Where to send the results of processing</param>
            /// <param name="crid">Correlation identifier of the request.</param>
            protected static void ProcessMessage(string queuename, string crid)
            {
                ITextMessage response = session.CreateTextMessage("Pong!");
                response.NMSCorrelationID = crid;

                IDestination destination = session.GetQueue(queuename);

                Console.WriteLine("Sending response with CRID of '" + crid + "' to " + queuename + "'");
                try
                {
                    producer.Send(destination, response);
                }
                catch (Exception ex)
                {
                    Console.Error.WriteLine("Could not send response: " + ex.Message);
                }

            }

        }

    }

Теперь для клиента.Он просто создает временную очередь, начинает ее прослушивать, а затем отправляет запрос в очередь, которую прослушивает наша служба «Понг».Сообщение с запросом содержит IDestination временной очереди.

    using System;
    using System.Threading;
    using Apache.NMS;
    using Apache.NMS.Util;

    namespace PongClient
    {
        class PongClient
        {
            protected static AutoResetEvent semaphore = new AutoResetEvent(false);
            protected static ITextMessage message = null;
            protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(3);

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("About to connect to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);

                IConnection connection = factory.CreateConnection();
                ISession session = connection.CreateSession();

                IDestination temporaryDestination = session.CreateTemporaryQueue();
                Console.WriteLine("Private destination: " + temporaryDestination);

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Service destination: " + destination);


                IMessageConsumer consumer = session.CreateConsumer(destination);
                consumer.Listener += new MessageListener(OnMessage);

                IMessageProducer producer = session.CreateProducer(destination);

                connection.Start();

                // Send a request message
                ITextMessage request = session.CreateTextMessage("Ping");
                request.NMSCorrelationID = Guid.NewGuid().ToString();
                request.NMSReplyTo = temporaryDestination;
                producer.Send(request);

                // Wait for the message
                semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
                if (message == null)
                {
                    Console.WriteLine("Timed-Out!");
                }
                else
                {
                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                    Console.WriteLine("Received message with text: " + message.Text);
                }
            }



            protected static void OnMessage(IMessage receivedMsg)
            {
                message = receivedMsg as ITextMessage;
                semaphore.Set();
            }
        }
    }

Кажется, что процесс Pong работает правильно, только он завершает работу, создавая совершенно новую отдельную очередь из очереди, указанной в заголовке Reply-To.

Вот версии задействованных технологий:

  • Apache.NMS.ActiveMQ v1.5.1
  • Apache.NMS API v1.5.0
  • ActiveMQ 5.5.0
  • C # .NET 3.5

Этот вопрос относится к этому сообщению , в котором описана аналогичная проблема.Надеемся, что эти примеры помогут прояснить проблему в этом запросе.

Любая помощь или понимание решения будет принята с благодарностью.

Ответы [ 3 ]

1 голос
/ 07 сентября 2011

Вы фактически не устанавливаете заголовок reply-to в сообщении запроса от PongClient.

Попробуйте это:

ITextMessage request = session.CreateTextMessage("Ping");
request.NMSCorrelationID = Guid.NewGuid().ToString();
request.NMSReplyTo = temporaryDestination;
producer.Send(request);
0 голосов
/ 06 апреля 2013

Я бы порекомендовал использовать тему в качестве места назначения ответа, а ваш потребительский фильтр основывался на NMSCorrelationID.Это реализация, к которой я перешел после большого разочарования временными очередями.Это на самом деле имеет много преимуществ.

  1. Это сокращает интенсивное использование ресурсов на сервере (не нужно создавать / деконструировать временные очереди).
  2. Это позволяет использовать другого потребителя дляотслеживать ответ, отправленный обратно (вы никогда не сможете «заглянуть» во временную очередь).
  3. И это гораздо надежнее, поскольку тему можно передавать через логическое имя вместо определенного идентификатора токена (который вы теряете через соединения).
0 голосов
/ 20 февраля 2012

Вам нужно использовать IDestination, которое вы передали.

Calling

IDestination destination = session.GetQueue(queuename); 

это маленькое зло. Под прикрытием это вызывает CreateTeoraryQueue () заменяет существующую временную очередь новой с тем же именем, не сообщая вам.

...