Я нашел большую часть того, что искал здесь ActiveMQ - мне нужно повторно подписаться на очередь после того, как событие Listener сработало? , но я не могу понять, как сохранить слушателявыполнение, отличное от использования цикла while (true), который, я думаю, должен быть лучше, чтобы поддерживать слушателя активным, и в то же время иметь возможность любезно распоряжаться всеми процессами, если мне нужно остановить приложение.Пользователь Тим Биш утвердительно отвечает на заявление reckface «Означает ли это, что событие Listener будет запускаться для каждого сообщения без цикла while?», Но я не могу понять, как реализовать его без цикла while (true).
using System;
using System.Collections.Generic;
using System.Threading;
using System.Text;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System.Runtime.Serialization.Json;
using System.IO;
using System.Configuration;
using System.Data;
using System.Data.SqlClient;
using System.Web;
namespace ActiveMQConnectionTest
{
class Program : IDisposable
{
private static IConnection connection;
private static ISession session;
private static SqlConnection sqlConn;
private static ActiveMQMessage msg;
private static MessageConsumer consumer;
private static DateTime timeStamp;
private static AutoResetEvent semaphore = new AutoResetEvent(false);
private static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
static string un = ConfigurationManager.AppSettings["AMQUserName"];
static string pwd = ConfigurationManager.AppSettings["AMQPassword"];
static string url = ConfigurationManager.AppSettings["url"];
static string queue = ConfigurationManager.AppSettings["queue"];
private static string oldMsgId;
Program()
{
AppDomain.CurrentDomain.ProcessExit += CurrentDomain_ProcessExit;
sqlConn = new SqlConnection(ConfigurationManager.AppSettings["SQLConn"].ToString());
System.Uri uri = new Uri(url);
IConnectionFactory factory = new ConnectionFactory(uri);
try
{
connection = factory.CreateConnection(un, pwd);
connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
ActiveMQDestination dest = session.GetDestination(queue) as ActiveMQDestination;
consumer = session.CreateConsumer(dest) as MessageConsumer;
}
catch (NMSConnectionException ex)
{
Console.Write(ex.Message);
connection.Dispose();
}
try
{
connection.Start();
Console.WriteLine("Connection Started...");
Console.WriteLine("Session Created....");
}
catch (ConnectionFailedException ex)
{
connection.Close();
Console.Write(ex.Message);
}
}
~Program()
{
Dispose(false);
}
protected void Dispose(Boolean itIsSafeToAlsoFreeManagedObjects)
{
if (itIsSafeToAlsoFreeManagedObjects)
{
if (connection != null)
{
connection.Dispose();
}
if (session != null)
{
session.Dispose();
}
if (consumer != null)
{
consumer.Dispose();
}
}
}
public void Dispose()
{
Dispose(true);
}
static void ShutDown()
{
session.Close();
if (connection.IsStarted)
{
connection.Stop();
connection.Close();
connection.Dispose();
}
}
protected static void consumer_Listener(IMessage messasge)
{
messasge.Acknowledge();
msg = (ActiveMQMessage)messasge;
if (msg.MessageId.ToString() != oldMsgId)
{
oldMsgId = msg.MessageId.ToString();
msg.Acknowledge();
if (msg == null)
{
Console.WriteLine("No message received!");
}
else
{
Console.WriteLine("Received message with ID: " + msg.NMSMessageId);
Console.WriteLine("Received message with conetent: " + msg.ToString());
try
{
string s = ASCIIEncoding.ASCII.GetString(msg.Content);
timeStamp = DateTime.Now;
DataContractJsonSerializer deserializer = new DataContractJsonSerializer(typeof(I280Message));
var ms = new MemoryStream(msg.Content);
I280Message rows = (I280Message)deserializer.ReadObject(ms);
int MessageId = InsertPerson(rows.Person);
semaphore.Set();
}
catch (NMSException ex)
{
ShutDown();
Console.WriteLine(ex.Message);
}
}
}
else {
Console.WriteLine("Same old message....");
}
}
private static int InsertPerson(Person person)
{
using (SqlConnection sqlConn = new SqlConnection(ConfigurationManager.AppSettings["SQLConn"]))
{
using (SqlCommand sqlCmd = new SqlCommand("I280MessagePerson_tbl_isp", sqlConn))
{
sqlCmd.CommandType = CommandType.StoredProcedure;
sqlCmd.Parameters.AddWithValue("@BirthDate", person.BirthDate);
sqlCmd.Parameters.AddWithValue("@Gender", person.Gender);
sqlCmd.Parameters.AddWithValue("@VisaPermitType", person.VisaPermitType, null);
sqlCmd.Parameters.AddWithValue("@CitizenshipStatus", person.CitizenshipStatus, null);
sqlCmd.Parameters.AddWithValue("@ConfidentialFlag", person.ConfidentialFlag);
sqlCmd.Parameters.AddWithValue("@DeceasedFlag", person.DeceasedFlag, null);
sqlCmd.Parameters.AddWithValue("@TimeStamp", timeStamp);
SqlParameter paramPersonId = new SqlParameter("@MessageId", SqlDbType.Int);
paramPersonId.Direction = ParameterDirection.Output;
sqlCmd.Parameters.Add(paramPersonId);
sqlConn.Open();
try
{
sqlCmd.ExecuteNonQuery();
return (int)(sqlCmd.Parameters["@MessageId"].Value);
}
catch (SqlException ex)
{
Console.WriteLine(ex.Message);
if (sqlConn.State == ConnectionState.Open) sqlConn.Close();
return -1;
}
}
}
}
static void Main(string[] args)
{
using (Program pr = new Program())
{
consumer.Listener += new MessageListener(consumer_Listener);
}
//while (true)
//{
// consumer.Listener += new MessageListener(consumer_Listener);
// semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
//}
//catch (NMSException ex)
//{
// ShutDown();
// Console.WriteLine(ex.Message);
//}
// Console.ReadLine();
}
}
public static class SqlParameterCollectionExtensions
{
public static SqlParameter AddWithValue(this SqlParameterCollection target, string parameterName, object value, object nullValue)
{
if (value == null || (string)value == "")
{
return target.AddWithValue(parameterName, nullValue ?? DBNull.Value);
}
return target.AddWithValue(parameterName, value);
}
}
}