Используя обмен topi c, я хотел бы иметь шаблон обмена сообщениями публикации / подписки со следующими функциями:
- Have " Издатель подтверждает"реализовано.
- Потребитель должен подтверждать каждое сообщение после его обработки.
- Используйте ключи маршрутизации для маршрутизации сообщений на один или несколько потребителей.
- Иметь постоянные очереди потребителей, чтобы, если приложение-получатель временно не работает, оно могло забирать сообщения из своей очереди, когда оно возвращается.
Итак, я создал 2 консольных приложения (Отправка и Получение), чтобы проверить вышесказанное.
Отправить
static void Main(string[] args)
{
Console.WriteLine(" Type [exit] to exit.");
Publisher publisher = new Publisher();
do
{
var userInput = Console.ReadLine();
if (userInput == "exit")
{
break;
}
publisher.SendMessageToBroker("localhost", "main", "user.update", userInput);
} while (true);
}
Издатель
public class Publisher
{
const string ExchangeType = "topic";
Dictionary<ulong, string> unConfirmedMessageTags = new Dictionary<ulong, string>();
public void SendMessageToBroker(string host, string exchangeName, string routingKey, string message)
{
var factory = new ConnectionFactory() { HostName = host };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.BasicAcks += (sender, ea) => OnBasicAcks(ea.Multiple, ea.DeliveryTag);
channel.BasicNacks += (sender, ea) => OnBasicNacks(ea.Multiple, ea.DeliveryTag);
channel.ConfirmSelect();
channel.ExchangeDeclare(exchangeName, ExchangeType);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
unConfirmedMessageTags.TryAdd(channel.NextPublishSeqNo, message);
channel.BasicPublish(exchange: exchangeName,
routingKey: routingKey,
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
private void OnBasicNacks(bool multiple, ulong deliveryTag)
{
if (multiple)
{
Console.WriteLine("Messages with delivery tag LESS THAN {0} have been LOST and must be resent.", deliveryTag);
}
else
{
Console.WriteLine("Message with delivery tag {0} has been LOST and must be resent.", deliveryTag);
}
}
private void OnBasicAcks(bool multiple, ulong deliveryTag)
{
if (multiple)
{
var confirmed = unConfirmedMessageTags.Where(k => k.Key <= deliveryTag);
foreach (var entry in confirmed)
{
unConfirmedMessageTags.Remove(entry.Key);
Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", entry.Key);
}
}
else
{
unConfirmedMessageTags.Remove(deliveryTag);
Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", deliveryTag);
}
}
}
}
Получить
static void Main(string[] args)
{
const string ExchangeName = "main";
const string QueueName = "q1";
const string ExchangeType = "topic";
const string RoutingKey = "user.update";
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, ExchangeType);
channel.QueueDeclare(queue: QueueName,
durable: true,
autoDelete: false,
exclusive: false,
arguments: null);
channel.QueueBind(QueueName, ExchangeName, RoutingKey);
//channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => Basic_Ack(channel, ea.DeliveryTag, ea.Body);
channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
private static void Basic_Ack(IModel channel, ulong deliveryTag, ReadOnlyMemory<byte> body)
{
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] Received {0}", message);
Thread.Sleep(2000);
channel.BasicAck(deliveryTag: deliveryTag, multiple: false);
Console.WriteLine(" [x] Processed {0}", message);
}
}
Проблема в том, что OnBasicAcks в моей Send программа вызывается только один раз для первого сообщения.