Как правильно поддерживать приемник ActiveMQ в консольном приложении C # и корректно распоряжаться ресурсами до завершения работы? - PullRequest
0 голосов
/ 17 октября 2018

Я нашел большую часть того, что искал здесь ActiveMQ - мне нужно повторно подписаться на очередь после того, как событие Listener сработало? , но я не могу понять, как сохранить слушателявыполнение, отличное от использования цикла while (true), который, я думаю, должен быть лучше, чтобы поддерживать слушателя активным, и в то же время иметь возможность любезно распоряжаться всеми процессами, если мне нужно остановить приложение.Пользователь Тим Биш утвердительно отвечает на заявление reckface «Означает ли это, что событие Listener будет запускаться для каждого сообщения без цикла while?», Но я не могу понять, как реализовать его без цикла while (true).

using System;
using System.Collections.Generic;
using System.Threading;
using System.Text;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System.Runtime.Serialization.Json;
using System.IO;
using System.Configuration;
using System.Data;
using System.Data.SqlClient;
using System.Web;



namespace ActiveMQConnectionTest
{
    class Program : IDisposable
    {
        private static IConnection connection;
        private static ISession session;
        private static SqlConnection sqlConn;
        private static ActiveMQMessage msg;
        private static MessageConsumer consumer;
        private static DateTime timeStamp;
        private static AutoResetEvent semaphore = new AutoResetEvent(false);
        private static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
        static string un = ConfigurationManager.AppSettings["AMQUserName"];
        static string pwd = ConfigurationManager.AppSettings["AMQPassword"];
        static string url = ConfigurationManager.AppSettings["url"];
        static string queue = ConfigurationManager.AppSettings["queue"];
        private static string oldMsgId;



        Program() 
        {
            AppDomain.CurrentDomain.ProcessExit += CurrentDomain_ProcessExit;
        sqlConn = new SqlConnection(ConfigurationManager.AppSettings["SQLConn"].ToString());
        System.Uri uri = new Uri(url);
        IConnectionFactory factory = new ConnectionFactory(uri);



        try
        {
            connection = factory.CreateConnection(un, pwd);
            connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
            session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
            ActiveMQDestination dest = session.GetDestination(queue) as ActiveMQDestination;
            consumer = session.CreateConsumer(dest) as MessageConsumer;
        }
        catch (NMSConnectionException ex)
        {
            Console.Write(ex.Message);
            connection.Dispose();
        }

        try
        {

            connection.Start();
            Console.WriteLine("Connection Started...");
            Console.WriteLine("Session Created....");

        }
        catch (ConnectionFailedException ex)
        {
            connection.Close();
            Console.Write(ex.Message);
        }

    }

    ~Program()
    {
        Dispose(false);
    }

    protected void Dispose(Boolean itIsSafeToAlsoFreeManagedObjects)
    {

        if (itIsSafeToAlsoFreeManagedObjects)
        {
            if (connection != null)
            {
                connection.Dispose();
            }
            if (session != null)
            {
                session.Dispose();
            }
            if (consumer != null)
            {
                consumer.Dispose();
            }
        }
    }

    public void Dispose()
    {
        Dispose(true); 
    }       

    static void ShutDown()
    {

        session.Close();
        if (connection.IsStarted)
        {
            connection.Stop();
            connection.Close();
            connection.Dispose();
        }
    }

    protected static void consumer_Listener(IMessage messasge)
    {
        messasge.Acknowledge();
        msg = (ActiveMQMessage)messasge;

       if (msg.MessageId.ToString() != oldMsgId)
       {
        oldMsgId = msg.MessageId.ToString();
        msg.Acknowledge();
        if (msg == null)
        {
            Console.WriteLine("No message received!");
        }
        else
        {
            Console.WriteLine("Received message with ID: " + msg.NMSMessageId);
            Console.WriteLine("Received message with conetent: " + msg.ToString());

            try
            {
                string s = ASCIIEncoding.ASCII.GetString(msg.Content);
                timeStamp = DateTime.Now;

                DataContractJsonSerializer deserializer = new DataContractJsonSerializer(typeof(I280Message));
                var ms = new MemoryStream(msg.Content);
                I280Message rows = (I280Message)deserializer.ReadObject(ms);
                int MessageId = InsertPerson(rows.Person);

                semaphore.Set();
            }
            catch (NMSException ex)
            {
                ShutDown();
                Console.WriteLine(ex.Message);
            }
        }
          }
        else {
            Console.WriteLine("Same old message....");
        }


    }



    private static int InsertPerson(Person person)
    {
        using (SqlConnection sqlConn = new SqlConnection(ConfigurationManager.AppSettings["SQLConn"]))
        {
            using (SqlCommand sqlCmd = new SqlCommand("I280MessagePerson_tbl_isp", sqlConn))
            {

                sqlCmd.CommandType = CommandType.StoredProcedure;
                sqlCmd.Parameters.AddWithValue("@BirthDate", person.BirthDate);
                sqlCmd.Parameters.AddWithValue("@Gender", person.Gender);
                sqlCmd.Parameters.AddWithValue("@VisaPermitType", person.VisaPermitType, null);
                sqlCmd.Parameters.AddWithValue("@CitizenshipStatus", person.CitizenshipStatus, null);
                sqlCmd.Parameters.AddWithValue("@ConfidentialFlag", person.ConfidentialFlag);
                sqlCmd.Parameters.AddWithValue("@DeceasedFlag", person.DeceasedFlag, null);
                sqlCmd.Parameters.AddWithValue("@TimeStamp", timeStamp);
                SqlParameter paramPersonId = new SqlParameter("@MessageId", SqlDbType.Int);
                paramPersonId.Direction = ParameterDirection.Output;
                sqlCmd.Parameters.Add(paramPersonId);




                sqlConn.Open();
                try
                {

                    sqlCmd.ExecuteNonQuery();
                    return (int)(sqlCmd.Parameters["@MessageId"].Value);
                }
                catch (SqlException ex)
                {
                    Console.WriteLine(ex.Message);
                    if (sqlConn.State == ConnectionState.Open) sqlConn.Close();
                    return -1;
                }

            }
        }


    }

    static void Main(string[] args)
    {           

            using (Program pr = new Program())
            {
                consumer.Listener += new MessageListener(consumer_Listener);
            }



            //while (true)
            //{

            //    consumer.Listener += new MessageListener(consumer_Listener);
            //    semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
            //}
        //catch (NMSException ex)
        //{
        //    ShutDown();
        //    Console.WriteLine(ex.Message);
        //}

       // Console.ReadLine();



    }


}

public static class SqlParameterCollectionExtensions
{
    public static SqlParameter AddWithValue(this SqlParameterCollection target, string parameterName, object value, object nullValue)
    {
        if (value == null || (string)value == "")
        {
            return target.AddWithValue(parameterName, nullValue ?? DBNull.Value);
        }
        return target.AddWithValue(parameterName, value);
    }
}

}

Ответы [ 2 ]

0 голосов
/ 06 января 2019

Это не ответ на вопрос.Но я пишу это так, чтобы другие, кто сталкивался с проблемами, похожими на мои, могли ссылаться на ответ, когда сталкивались с проблемой.

Но я столкнулся с проблемой, когда соединение с ActiveMQ (с использованием C #) было активным, нетисключения, никаких ошибок, но все равно клиент не получал никаких сообщений, опубликованных сервером.

Изучив некоторое время на сайте Apache, я понял, что это происходит из-за тайм-аута.Я исправил это, используя следующую строку кода: -

brokerUri += "?transport.useInactivityMonitor=false&transport.useKeepAlive=true";

, где brokerUri - это мой activeMq uri.

0 голосов
/ 19 октября 2018

Итак, в основном ваша проблема в том, что вы позволяете своей программе завершиться.Когда это происходит, любая память, которая выделяется во время выполнения вашей программы, возвращается операционной системой, включая ваш объект Program и делегат получателя запросов.

Ваш друг на предыдущем вопросе, который вы задавали, отметил, чтоесли вы сохраняете прослушиватель ActiveMQ в качестве переменной-члена и сохраняете его в области действия, вы должны иметь возможность получать столько сообщений, сколько вы хотите, столько времени, сколько вы хотите, без добавления нового прослушивателя при каждом получении сообщения.

Итак, теперь все, что осталось сделать, - это не допустить выхода из вашей программы. Существует несколько способов сделать это:

  1. Вы можете изменитьваша программа для приложения Winforms, как описано в других сообщениях переполнения стека.Это приведет к созданию цикла сообщений.

  2. Вы можете прочитать символ с консоли.Это блокирующий вызов;программа будет ждать, пока пользователь не нажмет клавишу.В то же время ваш ActiveMQ по-прежнему сможет принимать события.

  3. Вы можете использовать while (iStillWantToReceiveMessages) { }

...