два источника информации:
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Вы действительно должны сначала попытаться понять примеры.
Полезные операции для понимания:
- Объявление / утверждение / прослушивание / подписка / публикация
Re: ваш вопрос -- нет никаких причин, почему вы не можете иметь несколько слушателей.Или вы можете подписаться на n путей маршрутизации с одним получателем на обмене.
** re: неблокирующее **
Типичный получатель принимает сообщения по одному за раз.Вы можете вытащить их из очереди, или они будут автоматически размещены рядом с потребителем в «оконном» режиме (определяемом параметрами качества обслуживания Qos).Прелесть этого подхода в том, что для вас проделана большая тяжелая работа (надежность, гарантированная доставка и т. Д.).
Ключевой особенностью RabbitMQ является то, что если при обработке возникает ошибка, то сообщение повторно добавляется обратно в очередь (функция отказоустойчивости).
Необходимо знать большео вашей ситуации.
Часто, если вы публикуете список, о котором я упоминал выше, вы можете найти кого-нибудь из сотрудников RabbitMQ.Они очень полезны.
Надеюсь, это немного поможет.Поначалу много чего стоит задуматься, но с этим стоит продолжать.
Q & A
см .: http://www.rabbitmq.com/faq.html
Q.Можно ли подписаться на несколько очередей, используя новую подписку (channel, queueName)?
Да.Вы используете либо ключ привязки, например, abc. *. Hij, либо abc. #. Hij, либо прикрепляете несколько привязок.Первый предполагает, что вы разработали свои ключи маршрутизации на основе какого-то принципа, который имеет смысл для вас (см. Ключи маршрутизации в FAQ).Для последнего вам необходимо привязать к нескольким очередям.
Реализация n-привязок вручную.см .: http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs
за этим шаблоном не так много кода, поэтому вы можете свернуть свой собственный шаблон подписки, если подстановочных знаков недостаточно.вы можете наследовать от этого класса и добавить другой метод для дополнительных привязок ... возможно, это будет работать или что-то похожее (не проверено).
Спецификация AQMP говорит, что возможна множественная ручная привязка: http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind
Q.И если так, как я могу пройти через все подписанные очереди и вернуть сообщение (ноль, когда нет сообщений)?
С подписчиком вы будете уведомлены, когда сообщение будет доступно.В противном случае вы описываете интерфейс извлечения, в котором вы отправляете сообщение по запросу.Если нет доступных сообщений, вы получите нулевое значение.Кстати: метод Notify, вероятно, более удобен.
Q.Ох, и учтите, что у меня все эти операции выполняются разными способами.Я отредактирую свой пост, чтобы отразить код
Живой код:
эта версия должна использовать подстановочные знаки для подписки на более чем один ключ маршрутизации
n Ключи ручной маршрутизации с использованием подписки оставлены в качестве упражнения для читателя.;-) Я думаю, вы все равно склонялись к интерфейсу pull.Интерфейсы btw: pull менее эффективны, чем уведомляющие.
using (Subscription sub = new Subscription(ch, QueueNme))
{
foreach (BasicDeliverEventArgs ev in sub)
{
Process(ev.Body);
...
Примечание: foreach использует IEnumerable, а IEnumerable переносит событие, что новое сообщение поступило через оператор yield,Фактически это бесконечный цикл.
--- ОБНОВЛЕНИЕ
AMQP был разработан с идеей сохранения количества соединений TCP на уровне приложений, что означает, что вы можете иметьмного каналов на соединение.
код в этом вопросе (правка 3) пытается использовать двух подписчиков с одним каналом, тогда как он должен (я считаю) быть одним подписчиком на канал на поток, чтобы избежать проблем с блокировкой.Sugestion: используйте ключ маршрутизации "подстановочный знак".С помощью java-клиента можно подписаться на несколько разных имен очереди, но, насколько мне известно, клиент .net не реализовал это в вспомогательном классе подписчика.
Если вам действительно нужны два разных имени очереди в одном и том же потоке подписки, то для .net предлагается следующая последовательность извлечения:
using (IModel ch = conn.CreateModel()) { // btw: no reason to close the channel afterwards IMO
conn.AutoClose = true; // no reason to closs the connection either. Here for completeness.
ch.QueueDeclare(queueName);
BasicGetResult result = ch.BasicGet(queueName, false);
if (result == null) {
Console.WriteLine("No message available.");
} else {
ch.BasicAck(result.DeliveryTag, false);
Console.WriteLine("Message:");
}
return 0;
}
- ОБНОВЛЕНИЕ 2:
из списка RabbitMQ:
"Предположим, что element.Next () блокирует одну из подписок. Вы можете получать доставки из каждой подписки с тайм-аутом для чтения после нее. В качестве альтернативы вы можете настроить однуочередь для получения всех измерений и извлечения из нее сообщений по одной подписке. "(Эмиль)
Это означает, что, когда первая очередь пуста, .Next () блокирует ожидание появления следующего сообщения.т. е. у подписчика есть встроенное сообщение ожидания следующего сообщения.
- ОБНОВЛЕНИЕ 3:
в .net, используйте QueueingBasicConsumer для потребления из нескольких очередей.
На самом деле, есть ветка об этом, чтобы почувствовать использование:
Ожидание одного сообщения RabbitMQ с таймаутом
- UPDATE4:
дополнительная информация о .QueueingBasicConsumer
Здесь приведен пример кода.
http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html
пример скопирован в ответ с несколькими изменениями (см. // <----). </p>
IModel channel = ...;
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, false, null, consumer); //<-----
channel.BasicConsume(queueName2, false, null, consumer); //<-----
// etc. channel.BasicConsume(queueNameN, false, null, consumer); //<-----
// At this point, messages will be being asynchronously delivered,
// and will be queueing up in consumer.Queue.
while (true) {
try {
BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
// ... handle the delivery ...
channel.BasicAck(e.DeliveryTag, false);
} catch (EndOfStreamException ex) {
// The consumer was cancelled, the model closed, or the
// connection went away.
break;
}
}
- ОБНОВЛЕНИЕ 5: простое получение, которое будет действовать в любой очереди (более медленный, но иногда более удобный метод).
ch.QueueDeclare(queueName);
BasicGetResult result = ch.BasicGet(queueName, false);
if (result == null) {
Console.WriteLine("No message available.");
} else {
ch.BasicAck(result.DeliveryTag, false);
Console.WriteLine("Message:");
// deserialize body and display extra info here.
}