RabbitMQ и Сериализация странная ошибка - PullRequest
3 голосов
/ 12 июля 2011

У меня есть два приложения: app1.cs и app2.cs (коды приведены ниже). Кроме того, у меня также есть DLL, которую я извлек из файла refer.cs (код ниже). Когда я компилирую app1.cs (который отправляет объект измерения), я получаю следующее исключение:

Unhandled Exception: RabbitMQ.Client.Exceptions.OperationInterruptioedException

Я не вижу, как прерывается соединение. Вы видите, где проблема вызвана?

С уважением, Demi

//refer.cs from which refer.dll is created

using System;
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;

namespace refer
{
    //start alternate serialization
    public static class AltSerialization
    {
        public static byte[] AltSerialize(Measurement m)
        {
         using (var ms = new MemoryStream())
            {
                var bf = new BinaryFormatter();
                bf.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
                bf.Serialize(ms, m);
                return ms.GetBuffer();
            }
        }

        public static Measurement AltDeSerialize(byte[] seriM)   
        {
        using (var stream = new MemoryStream( seriM ))
            {
                BinaryFormatter bf = new BinaryFormatter();
                bf.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
                return (Measurement)bf.Deserialize(stream);           
            }
        }
    }
    //end alternte serialization

    [Serializable] //This attribute sets class to be serialized
    public class Measurement : ISerializable
    {             
        [NonSerialized] public int id;
        public int time; //timestamp
        public double value;

        public Measurement()
        {
            id = 1;
            time = 12;
            value = 0.01;
        }

        public Measurement(int _id, int _time, double _value)
        {
            id = _id;
            time = _time;
            value = _value;
        }

        //Deserialization constructor   
        public Measurement(SerializationInfo info, StreamingContext ctxt)
        {
            //Assign the values from info to the approporiate properties   
            Console.WriteLine("DeSerialization construtor called.");
            time = (int)info.GetValue("MeasurementTime", typeof(int));
            value = (double)info.GetValue("MeasurementValue", typeof(double));
        }

       //Serialization function   
        public void GetObjectData(SerializationInfo info, StreamingContext ctxt)
        {
            // Custom name-value pair
            // Values must be read with the same name they're written       
            info.AddValue("MeasurementTime", time);
            info.AddValue("MeasurementValue", value);
        }
    }
}

// MB1.cs

using System;
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using UtilityMeasurement;

public interface IMessageBus
{    
string MsgSys       // Property 1
{
    get;
    set;
}

void write (Measurement m1);
Measurement read();
void publish(string queue);   
void subscribe(string queue);   
}

public class Rabbit : IMessageBus
{   
// Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();


public void write ( Measurement m1 )
{
    byte[] body = Measurement.AltSerialize( m1 );

    IConnection connection = factory.CreateConnection();
    IModel channel = connection.CreateModel();

    foreach (string queue in publishQ) 
    {
        channel.BasicPublish("", queue, null, body);
        Console.WriteLine("\n  [x] Sent to queue {0}.", queue);
    }
}

public void publish(string queueName)
{       
    channel.QueueDeclare(queueName, true, false, false, null); //durable=true
    publishQ.Add(queueName); //and, add it the list of queue names to publish to
}

public Measurement read() 
{
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
    foreach (string queue in subscribeQ) 
    {
        channel.BasicConsume(queue, true, consumer);
    }   
    System.Console.WriteLine(" [*] Waiting for messages." +
                            "To exit press CTRL+C");
    BasicDeliverEventArgs ea = 
        (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    return Measurement.AltDeSerialize(ea.Body);
}

public void subscribe(string queueName)
{
    channel.QueueDeclare(queueName, true, false, false, null);
    subscribeQ.Add(queueName);
}

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

public Rabbit(string _msgSys) //Constructor
{
    ConnectionFactory factory = new ConnectionFactory();
    factory.HostName = "localhost"; 

    System.Console.WriteLine("\nMsgSys: RabbitMQ");
    MsgSys = _msgSys;
}
}

public class Zmq : IMessageBus
{
public void write ( Measurement m1 )
{
    //
}
public Measurement read() 
{
    //
    return null;
}
public void publish(string queue)
{
//
}
public void subscribe(string queue)
{
//      
}   

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

// Implementation of methods for Zmq class go here
public Zmq(string _msgSys) //Constructor
{
    System.Console.WriteLine("ZMQ");
    MsgSys = _msgSys;
}
} 

public class MessageBusFactory
{
public static IMessageBus GetMessageBus(string MsgSysName)
{
    switch ( MsgSysName )
    {
        case "Zmq":
            return new Zmq(MsgSysName);
        case "Rabbit":
            return new Rabbit(MsgSysName);
        default:
            throw new ArgumentException("Messaging type " +
                MsgSysName + " not supported." );
    }
}
}

public class MainClass
{
    public static void Main()
    {
    //Asks for the message system
    System.Console.WriteLine("\nEnter name of messageing system: ");
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
    string MsgSysName = (System.Console.ReadLine()).ToString();

    //Create a new Measurement message
    Measurement m1 = new Measurement(2, 2345, 23.456);

    //Declare an IMessageBus instance:
    //Here, an object of the corresponding Message System
        // (ex. Rabbit, Zmq, etc) is instantiated
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    System.Console.WriteLine("With Test message:\n    ID: {0}", m1.id);
    System.Console.WriteLine("    Time: {0}", m1.time);
    System.Console.WriteLine("    Value: {0}", m1.value);

    // Ask queue name and store it
    System.Console.WriteLine("Enter a queue name to publish the message to: ");
    string QueueName = (System.Console.ReadLine()).ToString();
    obj1.publish( QueueName );

    System.Console.WriteLine("Enter another queue name: ");
    QueueName = (System.Console.ReadLine()).ToString();
    obj1.publish( QueueName );

    // Write message to the queue
    obj1.write( m1 ); 

}
}

// MB2.cs

using System; 
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using UtilityMeasurement;

public interface IMessageBus
{    
string MsgSys       // Property 1
{
    get;
    set;
}

void write (Measurement m1);
Measurement read();
void publish(string queue);   
void subscribe(string queue);   
}

public class Rabbit : IMessageBus
{   
// Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();


public void write ( Measurement m1 )
{
    byte[] body = Measurement.AltSerialize( m1 );

    IConnection connection = factory.CreateConnection();
    IModel channel = connection.CreateModel();

    foreach (string queue in publishQ) 
    {
        channel.BasicPublish("", queue, null, body);
        Console.WriteLine("\n  [x] Sent to queue {0}.", queue);
    }
}

public void publish(string queueName)
{       
    channel.QueueDeclare(queueName, true, false, false, null); //durable=true
    publishQ.Add(queueName); //and, add it the list of queue names to publish to
}

public Measurement read() 
{
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
    foreach (string queue in subscribeQ) 
    {
        channel.BasicConsume(queue, true, consumer);
    }   
    System.Console.WriteLine(" [*] Waiting for messages." +
                            "To exit press CTRL+C");
    BasicDeliverEventArgs ea = 
        (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    return Measurement.AltDeSerialize(ea.Body);
}

public void subscribe(string queueName)
{
    channel.QueueDeclare(queueName, true, false, false, null);
    subscribeQ.Add(queueName);
}

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

public Rabbit(string _msgSys) //Constructor
{
    ConnectionFactory factory = new ConnectionFactory();
    factory.HostName = "localhost"; 

    System.Console.WriteLine("\nMsgSys: RabbitMQ");
    MsgSys = _msgSys;
}
}


public class Zmq : IMessageBus
{
public void write ( Measurement m1 )
{
    //
}
public Measurement read() 
{
    //
    return null;
}
public void publish(string queue)
{
//
}
public void subscribe(string queue)
{
//      
}   

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

// Implementation of methods for Zmq class go here
public Zmq(string _msgSys) //Constructor
{
    System.Console.WriteLine("ZMQ");
    MsgSys = _msgSys;
}
} 

public class MessageBusFactory
{
public static IMessageBus GetMessageBus(string MsgSysName)
{
    switch ( MsgSysName )
    {
        case "Zmq":
            return new Zmq(MsgSysName);
        case "Rabbit":
            return new Rabbit(MsgSysName);
        default:
            throw new ArgumentException("Messaging type " +
                MsgSysName + " not supported." );
    }
}
}

public class MainClass
{
    public static void Main()
    {
    //Asks for the message system
    System.Console.WriteLine("\nEnter name of messageing system: ");
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
    string MsgSysName = (System.Console.ReadLine()).ToString();

    //Declare an IMessageBus instance:
    //Here, an object of the corresponding Message System
        // (ex. Rabbit, Zmq, etc) is instantiated
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    System.Console.WriteLine("Enter a queue to subscribe to: ");
    string QueueName = (System.Console.ReadLine()).ToString();
    obj1.subscribe( QueueName );

    //Create a new Measurement object m2
    Measurement m2 = new Measurement(); 

    //Read message into m2
    m2 = obj1.read();
    m2.id = 11;
    System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}",QueueName, m2.id);
    System.Console.WriteLine("    Time: {0}", m2.time);
    System.Console.WriteLine("    Value: {0}", m2.value);
}
}

1 Ответ

2 голосов
/ 12 июля 2011

Я только что создал проект консольного приложения C # VS2010 vanilla с Refer.cs и App1.cs в том же проекте.

Я внес следующие изменения:

  • Добавлен RabbitMQ.Client.dll
  • Удалены атрибуты AssemblyVersion
  • Добавлена ​​строка [] args в метод Main в App1.cs

Также я изменил:

factory.HostName = "localhost";

К этому:

factory.HostName = "192.168.56.101";

Какой IP-адрес у моей виртуальной машины VirtualBox Ubuntu, на которой работает rabbitmq-сервер. Не было сгенерировано исключение, и сообщение успешно было получено на сервере.

Все признаки указывают на конфигурацию сервера с указанным. Я предполагаю, что либо ваш rabbitmq-сервер вообще не работает, либо не работает на локальном хосте, либо существует какая-то проблема с подключением к порту 5672.

...