Насколько я понимаю, при указании AcknowledgementMode.ClientAcknowledge в производителе я должен иметь возможность остановить и перезапустить потребителя, и брокер повторно отправит все неподтвержденные сообщения в topi c. я не могу заставить это работать однако. (Я также пытался использовать прослушиватель onMessage у потребителя, но у меня такое же поведение.) Любая помощь приветствуется. вот мой производитель:
static void Main(string[] args) {
IConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616");
using (IConnection connection = factory.CreateConnection()) {
connection.ClientId = "producer";
using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge)) {
IDestination destination = SessionUtil.GetDestination(session, "MY_TOPIC", DestinationType.Topic);
IMessageProducer producer = session.CreateProducer(destination);
while (!Console.KeyAvailable) {
string message = "[" + DateTime.UtcNow.ToString("yyyy/MM/dd HH:mm:ss.fff") + "]";
ITextMessage xtext_message = session.CreateTextMessage(message);
producer.Send(xtext_message, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(1, 0, 0));
Console.WriteLine("Sent message:" + message);
Thread.Sleep(1000);
}
}
}
}
вот мой потребитель:
static void Main(string[] args) {
IConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616/");
using (IConnection connection = factory.CreateConnection()) {
connection.ClientId = "consumer";
connection.Start();
using (ISession session = connection.CreateSession()) {
IMessageConsumer consumer = session.CreateConsumer(SessionUtil.GetDestination(session, "MY_TOPIC", DestinationType.Topic));
while (!Console.KeyAvailable) {
ITextMessage receivedMsg = consumer.Receive() as ITextMessage;
if (receivedMsg != null) {
Console.WriteLine("--> received: " + receivedMsg.Text);
}
}
}
}
}