Получить подтверждение издателя для одного сообщения - PullRequest
0 голосов
/ 02 июля 2018

У меня есть производитель, который будет ставить в очередь каждый раз, когда сообщение получено через вызов API, и я хочу вернуться только тогда, когда получу подтверждение, что сообщение было получено брокером.

Я узнал, как это сделать, через издателя подтверждает -

    using (var connection = factory.CreateConnection())
            {
                using(var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchangeName, "topic", true, false, null);
                    //This enables producer confirm
                    channel.ConfirmSelect();

                    var properties = channel.CreateBasicProperties();

                    properties.Persistent = true;

                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(exchangeName, topic, properties, body);

                    channel.WaitForConfirms();                         

                    Console.WriteLine("I sent a message !", message);
                }
            }

Моя проблема в том, что я не хочу ждать ВСЕХ подтверждений, только те, которые связаны с этим конкретным сообщением. Я не хочу ограничивать это одним потоком / рабочим, и я не хочу ждать, пока все подтвердит.

Библиотека rabbit для js работает с обратным вызовом, который идеально подходит для моего использования - но версия C #, похоже, не поддерживает его.

Ответы [ 2 ]

0 голосов
/ 17 июня 2019

Что вам нужно, это Издатель подтверждает и транзакции.

Разделите сообщение, для которого требуется подтверждение, на одну (или подмножество) транзакций.

Транзакция:

ch.txSelect(); <-- start transaction
ch.basicPublish("", QUEUE_NAME,
                            MessageProperties.PERSISTENT_BASIC,
                            "nop".getBytes());
ch.txCommit();<--commit transaction

Вы можете использовать потоковое облегченное подтверждение издателя, используя:

ch.setConfirmListener(new ConfirmListener() {
    public void handleAck(long seqNo, boolean multiple) {

if (multiple) {
    unconfirmedSet.headSet(seqNo+1).clear();
} else {
unconfirmedSet.remove(seqNo);
}
}
    public void handleNack(long seqNo, boolean multiple) {
        // handle the lost messages somehow
    }

Надеюсь, это поможет

0 голосов
/ 03 июля 2018

Моя проблема в том, что я не хочу ждать ВСЕХ подтверждений, только те, которые связаны с этим конкретным сообщением.

Вы должны подписаться на обратный вызов BasicAcks и использовать его для сопоставления подтверждений с опубликованными вами сообщениями.

Я не хочу ограничивать это одним потоком / работником

Вы можете поделиться этим соединением между потоками, но вам нужно будет создать экземпляр для каждого потока IModel.


ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список рассылки rabbitmq-users и только иногда отвечает на вопросы о StackOverflow.

...