Как сделать SimpleRpcClient.Call () блокирующим вызовом для достижения синхронной связи с RabbitMQ? - PullRequest
0 голосов
/ 10 мая 2011

В .NET-версии (2.4.1) RabbitMQ RabbitMQ.Client.MessagePatterns.SimpleRpcClient имеет метод Call () со следующими сигнатурами:

    public virtual object[] Call(params object[] args);
    public virtual byte[] Call(byte[] body);
    public virtual byte[] Call(IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties);

Проблема:

При различных попытках метод по-прежнему продолжает не блокировать , где я ожидаю, поэтому он не может обработать ответ.

Вопрос:

Не упустил ли я что-то очевидное при настройке SimpleRpcClient или более ранней версии с IModel , IConnection или даже PublicationAddress ?

Подробнее:

Я также безуспешно пробовал различные конфигурации параметров метода QueueDeclare ().

string QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments);

НекоторыеБолее справочный код моей установки этих:

IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection());
using (IModel ch = conn.CreateModel())
{
     var client = new SimpleRpcClient(ch, queueName);
     var queueName = ch.QueueDeclare("t.qid", true, true, true, null);

     ch.QueueBind(queueName, "exch", "", null);

     //HERE: does not block?
     var replyMessageBytes = client.Call(prop, msgToSend, out replyProp);
}

Глядя в другом месте:

Или, скорее всего, есть проблема в моем "серверном" коде?С использованием и без использования BasicAck () кажется, что клиент уже продолжил выполнение.

1 Ответ

3 голосов
/ 11 мая 2011

- КОРОТКИЙ ОТВЕТ -

Бит " Вы делаете это неправильно " ...

Проверьте IBasicProperties , и вы должны использовать SimpleRpcServer с HandleSimpleCall ()

Если вы наткнулись на этот вопрос, вы либо выбрали неверный подход, как я, и, возможно, если бы сделали аналогичную ошибку, манипулируя IBasicProperties неумышленно, тем самым нанося ущерб способности SimpleRpcServer для правильной работы.

- ДОЛГО ОТВЕТ -

Рабочий образец для .NET: Найдите мой рабочий пример на BitBucket здесь:

https://bitbucket.org/NickJosevski/synchronous-rabbitmq-sample-.net

Или вот быстрый пример ...

Клиентская сторона:

IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection();
using (IModel ch = conn.CreateModel())
{
    ch.ExchangeDeclare(Helper.ExchangeName, "direct");

    var queueName = ch.EnsureQueue();

    var client = new SimpleRpcClient(ch, queueName);

    var msgToSend = new Message(/*data*/).Serialize();

    IBasicProperties replyProp;

    var reply = client.Call(new BasicProperties(), msgToSend, out replyProp);
}

Серверная часть:

IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection();
using (IModel ch = conn.CreateModel())
{
    ch.ExchangeDeclare(Helper.ExchangeName, "direct");
    var queuename = ch.EnsureQueue();

    var subscription = new Subscription(ch, queuename);

    new MySimpleRpcServerSubclass(subscription).MainLoop();
}

internal class MySimpleRpcServerSubclass : SimpleRpcServer
{
    public MySimpleRpcServerSubclass(Subscription subscription) 
        : base(subscription) { }

    public override byte[] HandleSimpleCall(
        bool isRedelivered, IBasicProperties requestProperties, 
        byte[] body, out IBasicProperties replyProperties)
    {
        replyProperties = requestProperties;
        replyProperties.MessageId = Guid.NewGuid().ToString();

        var m = Message.Deserialize(body);
        var r = 
            new Response
            {
                Message = String.Format("Got {0} with {1}", m.Name, m.Body)
            };

        return r.Serialize();
    }
}

Общий:

//helper:
public static string EnsureQueue(this IModel ch)
{
    var queueName = ch.QueueDeclare(QueueId, false, false, false, null);

    ch.QueueBind(queueName, ExchangeName, "", null);

    return queueName;
}

//NOTE: 
not all extension methods are explained here, such as *.Serialize()* 
as they're not relevant and just make for a cleaner example.
...