Сетевая симуляция с потоком памяти - PullRequest
0 голосов
/ 07 июня 2018

Мне нужно смоделировать сеть с узлами и MemoryStream, я хочу отправлять сообщения объектов от одного узла к другому.

Проблема в том, что я всегда получаю исключение, когда хочу десериализовать IMessage объект в потоке.Кроме того, как я могу имитировать server.AcceptClient?

Исключение:

System.Runtime.Serialization.SerializationException: "Входной поток не имеет допустимого двоичного формата. Начальное содержимое (в байтах): 05-01-00-00-00-22-50-65-65-72-5F-74-6F -5F-50-65-65 ... "

Код:

public class Network
{
    public List<Node> Nodes { get; set; }

    public Network()
    {
        Nodes = new List<Node>();
    }

    public MemoryStream GetClientStream(string ip)
    {
        foreach (var item in Nodes)
        {
            if (item.Ip == ip)
            {
                return item.Stream;
            }
        }

        return null;
    }
}


public class Node
{
    public string Ip { get; set; }

    public Client ChordClient { get; set; }

    public MemoryStream Stream { get; set; }

    public Network ChordChain { get; set; }

    public Node(string ip, string peerOne, Network network)
    {
        ChordChain = network;
        Ip = ip;
        Console.WriteLine($"Peer with Port {ip} is listening");
        Task.Run(() => openServer());
    }

    private void openServer()
    {
        try
        {
            Byte[] bytes = new Byte[256];

            Stream = new MemoryStream();
            // Enter the listening loop.
            while (true)
            {
                int i;

                while ((i = Stream.Read(bytes, 0, bytes.Length)) != 0)
                {
                    Stream.Position = 0;
                    IMessage msg = NodeTest.DeserializeFromStream(Stream);

                    msg.DoOperate();

                    Stream.Flush();
                }
            }
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        finally
        {
            // Stop listening for new clients.
        }
    }

    public void Connect(String ip)
    {
        try
        {
            IMessage msg = new Message();

            MemoryStream stream = ChordChain.GetClientStream(ip);

            NodeTest.SerializeToStream(stream, msg);

            Console.WriteLine("Sent: {0}", "sd");
        }
        catch (ArgumentNullException e)
        {
            Console.WriteLine("ArgumentNullException: {0}", e);
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        catch (Exception ex)
        {

        }


    }

}

public class NodeTest
{

    public static MemoryStream SerializeToStream(MemoryStream stream, object o)
    {
        IFormatter formatter = new BinaryFormatter();
        formatter.Serialize(stream, o);

        return stream;
    }

    public static IMessage DeserializeFromStream(MemoryStream stream)
    {

        stream.Position = 0;
        IFormatter formatter = new BinaryFormatter();
        stream.Seek(0, SeekOrigin.Begin);
        object o = formatter.Deserialize(stream);

        return (IMessage)(o);
    }
}

public class Message : IMessage
{

    public void DoOperate()
    {
        Console.WriteLine("Hello");
        Console.ReadKey();
    }

}

public interface IMessage
{
    void DoOperate();
}

Спасибо за вашу помощь, но теперь у меня есть проблема, как я могу отправить фиксированный размер объекта, который работает правильно

public class Node
{
    public string Ip { get; set; }

    public Client ChordClient { get; set; }

    public MemoryStream Stream { get; set; }

    public Network ChordChain {get; set; }

    public Node(string ip, string peerOne, Network network)
    {
        ChordChain = network;
        Ip = ip;
        Console.WriteLine($"Peer with Port {ip} is listening");
        Task.Run(() => openServer());
    }

    private void openServer()
    {
        try
        {
            NamedPipeServerStream server = new NamedPipeServerStream(Ip);
            Byte[] bytes = new Byte[256];
            int i;

            while (true)
            {
                server.WaitForConnection();

                while ((i = server.Read(bytes, 0, bytes.Length)) != 0)
                {

                    //Stream.Position = 0;
                    IMessage msg = NodeTest.DeserializeFromStream(server);

                    msg.DoOperate();
                }

            }

            //MemoryStream reader = new MemoryStream(server);
            //StreamWriter writer = new StreamWriter(server);

            //Byte[] bytes = new Byte[256];

            //Stream = new MemoryStream();
            //// Enter the listening loop.
            //while (true)
            //{
            //    int i;

            //    while ((i = Stream.Read(bytes, 0, bytes.Length)) != 0)
            //    {
            //        //Stream.Position = 0;
            //        IMessage msg = NodeTest.DeserializeFromStream(Stream);

            //        msg.DoOperate();

            //        Stream.Flush();
            //    }               
            //}
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        finally
        {
            // Stop listening for new clients.
        }
    }

    public void Connect(String ip)
    {
        try
        {              
            IMessage msg = new Message();


            NamedPipeClientStream client = new NamedPipeClientStream(ip);
            client.Connect();

            NodeTest.SerializeToStream(client, msg);

            Console.WriteLine("Sent: {0}", "sd");

            client.Close();
        }
        catch (ArgumentNullException e)
        {
            Console.WriteLine("ArgumentNullException: {0}", e);
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        catch(Exception ex)
        {

        }  
    }

}

public class NodeTest
{

    public static void SerializeToStream(NamedPipeClientStream stream, object o)
    { 
        IFormatter formatter = new BinaryFormatter();
        formatter.Serialize(stream, o);
    }

    public static IMessage DeserializeFromStream(NamedPipeServerStream   stream)
    {

        //stream.Position = 0;
        IFormatter formatter = new BinaryFormatter();
        //stream.Seek(0, SeekOrigin.Begin);
        object o = formatter.Deserialize(stream);

        return (IMessage)(o);
    }  
}

Ответы [ 2 ]

0 голосов
/ 07 июня 2018

Ваше использование MemoryStream не является потокобезопасным.Вы не можете перемещать и читать поток в одном потоке, пока другой пишет в него.Чтобы смоделировать этот тип асинхронной связи, используйте конвейерные потоки:

 NamedPipeServerStream server = new NamedPipeServerStream("MyPipe");
 server.WaitForConnection();
 StreamReader reader = new StreamReader(server);
 StreamWriter writer = new StreamWriter(server);

 [...]

 NamedPipeClientStream client = new NamedPipeClientStream("MyPipe");
 client.Connect();
 StreamReader reader = new StreamReader(client);
 StreamWriter writer = new StreamWriter(client);
0 голосов
/ 07 июня 2018

Я точно не знаю, что с этим не так, но у вас наверняка есть проблема, зная, есть ли у вас сообщение целиком или нет.

Если вы отправляете сообщения с такой сериализацией, то каждой стороне трудно узнать, получило ли оно все сообщение или нет.Чтобы решить эту проблему, нужно сформировать сериализованное сообщение, например, отправив:

...
[size][serialized message]
[size][serialized message]
...

Таким образом, чтобы отправить вам (1) сериализацию в память (2), получите размер как количество байтов (3).напишите размер (4) напишите, что много байтов

и чтобы получить вас (1) прочитайте размер (2) прочитайте ровно столько байтов в память (3) десериализуйте

я бы отсортировал этовышел первым.

...