Издатель RabbitMQ подтверждает обмен topi c - BasicAcks срабатывает только один раз в первый раз - PullRequest
0 голосов
/ 28 мая 2020

Используя обмен topi c, я хотел бы иметь шаблон обмена сообщениями публикации / подписки со следующими функциями:

  1. Have " Издатель подтверждает"реализовано.
  2. Потребитель должен подтверждать каждое сообщение после его обработки.
  3. Используйте ключи маршрутизации для маршрутизации сообщений на один или несколько потребителей.
  4. Иметь постоянные очереди потребителей, чтобы, если приложение-получатель временно не работает, оно могло забирать сообщения из своей очереди, когда оно возвращается.

Итак, я создал 2 консольных приложения (Отправка и Получение), чтобы проверить вышесказанное.

Отправить

    static void Main(string[] args)
    {

        Console.WriteLine(" Type [exit] to exit.");

        Publisher publisher = new Publisher();

        do
        {
            var userInput = Console.ReadLine();
            if (userInput == "exit")
            {
                break;
            }


            publisher.SendMessageToBroker("localhost", "main", "user.update", userInput);

        } while (true);
    }

Издатель

public class Publisher
{
    const string ExchangeType = "topic";

    Dictionary<ulong, string> unConfirmedMessageTags = new Dictionary<ulong, string>();

    public void SendMessageToBroker(string host, string exchangeName, string routingKey, string message)
    {

        var factory = new ConnectionFactory() { HostName = host };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {                
            channel.BasicAcks += (sender, ea) => OnBasicAcks(ea.Multiple, ea.DeliveryTag);
            channel.BasicNacks += (sender, ea) => OnBasicNacks(ea.Multiple, ea.DeliveryTag);

            channel.ConfirmSelect();

            channel.ExchangeDeclare(exchangeName, ExchangeType);

            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            unConfirmedMessageTags.TryAdd(channel.NextPublishSeqNo, message);

            channel.BasicPublish(exchange: exchangeName,
                routingKey: routingKey,
                basicProperties: properties,
                body: body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }

    private void OnBasicNacks(bool multiple, ulong deliveryTag)
    {
        if (multiple)
        {
            Console.WriteLine("Messages with delivery tag LESS THAN {0} have been LOST and must be resent.", deliveryTag);
        }
        else
        {
            Console.WriteLine("Message with delivery tag {0} has been LOST and must be resent.", deliveryTag);
        }
    }

    private void OnBasicAcks(bool multiple, ulong deliveryTag)
    {
        if (multiple)
        {
            var confirmed = unConfirmedMessageTags.Where(k => k.Key <= deliveryTag);
            foreach (var entry in confirmed)
            {
                unConfirmedMessageTags.Remove(entry.Key);
                Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", entry.Key);
            }

        }
        else
        {
            unConfirmedMessageTags.Remove(deliveryTag);
            Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", deliveryTag);
        }

    }
}

}

Получить

    static void Main(string[] args)
    {
        const string ExchangeName = "main";
        const string QueueName = "q1";
        const string ExchangeType = "topic";
        const string RoutingKey = "user.update";

        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(ExchangeName, ExchangeType);

            channel.QueueDeclare(queue: QueueName, 
                durable: true, 
                autoDelete: false, 
                exclusive: false, 
                arguments: null);

            channel.QueueBind(QueueName, ExchangeName, RoutingKey);

            //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            var consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) => Basic_Ack(channel, ea.DeliveryTag, ea.Body);

            channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }            
    }

    private static void Basic_Ack(IModel channel, ulong deliveryTag, ReadOnlyMemory<byte> body)
    {            
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine(" [x] Received {0}", message);

        Thread.Sleep(2000);            

        channel.BasicAck(deliveryTag: deliveryTag, multiple: false);

        Console.WriteLine(" [x] Processed {0}", message);
    }
}

Проблема в том, что OnBasicAcks в моей Send программа вызывается только один раз для первого сообщения.

enter image description here

1 Ответ

0 голосов
/ 01 июня 2020

Для всех, кто может столкнуться с этой проблемой, я открывал соединение и канал (виртуальное соединение) для каждого publi sh, что не рекомендуется :

"Подключения означает долгоживущий . Открытие соединения для каждой операции (например, публикации сообщения) было бы очень неэффективным и крайне не рекомендуется . "

Также см. здесь :

"Подтверждения издателя включены на уровне канала с помощью метода ConfirmSelect ... Этот метод должен вызываться на каждом канале, который вы ожидаете использовать, подтверждается издателем. Подтверждения должны быть включены только один раз, а не для каждого опубликованного сообщения. "

Переход на использование долговременного соединения решил проблему для меня.

...