Шаблон актера NetMQ - PullRequest
       44

Шаблон актера NetMQ

0 голосов
/ 03 ноября 2019

Я пытаюсь использовать одно приложение WPF для связи с другим приложением, когда что-то меняется в пользовательском интерфейсе. Например: изменяется ползунок, и я отправляю новое значение или изменение ввода текстового поля, и я отправляю новое значение ввода.

Второе приложение прослушивает и получает эти изменения для обновления различных свойств.

Для этого я использую NetMQ, следуя примеру шаблона Actor из документации: https://netmq.readthedocs.io/en/latest/actor/

Прямо сейчас у меня есть класс клиент / сервер:

Сервер:

public class NetMQServer
{
    public class ShimHandler : IShimHandler
    {
        private PairSocket shim;
        private NetMQPoller poller;
        private PublisherSocket publisher;
        private string Address;

        public ShimHandler(string address)
        {
            Address = address;
        }

        public void Initialise(object state)
        {
        }

        public void Run(PairSocket shim)
        {
            using (publisher = new PublisherSocket())
            {
                publisher.Bind(Address);
                publisher.Options.SendHighWatermark = 1000;
                this.shim = shim;
                shim.ReceiveReady += OnShimReady;
                shim.SignalOK();
                poller = new NetMQPoller { shim, publisher };
                poller.Run();
            }
        }

        private void OnShimReady(object sender, NetMQSocketEventArgs e)
        {
            string command = e.Socket.ReceiveFrameString();

            if(command == NetMQActor.EndShimMessage)
            {
                poller.Stop();
                return;
            }
            else
            {
                byte[] byteMessage = e.Socket.ReceiveFrameBytes();
                publisher.SendMoreFrame(command).SendFrame(byteMessage);
            }
        }

        private void UpdateString(string stringmessage, string propertyToUpdate)
        {
            propertyToUpdate = stringmessage;
        }
    }

    public NetMQServer(string ip, int port)
    {
        IP = ip;
        Port = port;
        Serializer = new CerasSerializer();
    }

    public CerasSerializer Serializer { get; set; }
    private NetMQActor actor;

    private string _name;
    public string Name
    {
        get { return _name; }
        set { _name = value;}
    }

    private int _port;
    public int Port
    {
        get { return _port; }
        set
        {
            _port = value;
            ReStart();
        }
    }

    private string _ip;
    public string IP
    {
        get { return _ip; }
        set
        {
            _ip = value;
            ReStart();
        }
    }

    public string Address
    {
        get { return String.Format("tcp://{0}:{1}", IP, Port); }
    }

    public void Start()
    {
        if (actor != null)
            return;
        actor = NetMQActor.Create(new ShimHandler(Address));
    }

    public void Stop()
    {
        if (actor != null)
        {
            actor.Dispose();
            actor = null;
        }
    }

    public void ReStart()
    {
        if (actor == null)
            return;
        Stop();
        Start();
    }

    public void SendObject(string topic, object commandParameter)
    {
        if (actor == null)
            return;

        byte[] Serialized = Serializer.Serialize(commandParameter);
        var message = new NetMQMessage();
        message.Append(topic);
        message.Append(Serialized);
        actor.SendMultipartMessage(message);
    }
}

Здесь, когда свойство элемента управления изменяется, я вызываю функцию SendStringMessage(string stringToSend), и она отправляется сNetMQ издатель (строка предназначена только для тестирования, я мог бы также отправить любой объект в виде байта).

В любом случае, вот клиент, работающий во втором приложении:

public class NetMQClient
{
    public class ShimHandler : IShimHandler
    {
        private PairSocket shim;
        private NetMQPoller poller;
        private SubscriberSocket subscriber;
        private ByteMessage ByteMessage;
        private string Address;
        private string Topic;
        private string MessageType;

        public ShimHandler(ByteMessage byteMessage, string address, string topic)
        {
            this.ByteMessage = byteMessage;
            this.Address = address;
            this.Topic = topic;
        }

        public void Initialise(object state)
        {
        }

        public void Run(PairSocket shim)
        {
            using (subscriber = new SubscriberSocket())
            {
                subscriber.Connect(Address);
                subscriber.Subscribe(Topic);
                subscriber.Options.ReceiveHighWatermark = 1000;
                subscriber.ReceiveReady += OnSubscriberReady;
                this.shim = shim;
                shim.ReceiveReady += OnShimReady;
                shim.SignalOK();
                poller = new NetMQPoller { shim, subscriber };
                poller.Run();
            }
        }

        private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
        {
            string topic = e.Socket.ReceiveFrameString();
            if (topic == Topic)
            {
                this.ByteMessage.Message = e.Socket.ReceiveFrameBytes();
            }
        }

        private void OnShimReady(object sender, NetMQSocketEventArgs e)
        {
            string command = e.Socket.ReceiveFrameString();
            if (command == NetMQActor.EndShimMessage)
            {
                poller.Stop();
            }
        }
    }

    public ByteMessage ByteMessage { get; set; }
    private NetMQActor actor;

    private string _command;
    public string Command
    {
        get { return _command; }
        set { _command = value;}
    }

    private string _topic;
    public string Topic
    {
        get { return _topic; }
        set
        {
            _topic = value;
            ReStart();
        }
    }

    private int _port;
    public int Port
    {
        get { return _port; }
        set
        {
            _port = value;
            ReStart();
        }
    }

    private string _ip;
    public string IP
    {
        get { return _ip; }
        set
        {
            _ip = value;
            ReStart();
        }
    }

    public string Address
    {
        get { return String.Format("tcp://{0}:{1}", IP, Port); }
    }

    public NetMQClient(string ip, int port, string topic)
    {
        ByteMessage = new ByteMessage();
        IP = ip;
        Port = port;
        Topic = topic;
    }

    public void Start()
    {
        if (actor != null)
            return;
        actor = NetMQActor.Create(new ShimHandler(ByteMessage, Address, Topic));
    }

    public void Stop()
    {
        if (actor != null)
        {
            actor.Dispose();
            actor = null;
        }
    }

    public void ReStart()
    {
        if (actor == null)
            return;

        Stop();
        Start();
    }
}

Теперь ByteMessageкласс выглядит просто так:

public class ByteMessage : INotifyPropertyChanged
{
    public ByteMessage()
    {

    }

    private byte[] _message;
    public byte[] Message
    {
        get { return _message; }
        set
        {
            _message = value;
            OnPropertyChanged("Message");
        }
    }

    public event PropertyChangedEventHandler PropertyChanged;
    private void OnPropertyChanged(string propertyName)
    {
        if (PropertyChanged != null) PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
    }
}

Одно второе приложение, классы будут иметь свойство NetMQClient в качестве свойства и регистрироваться в событии OnPropertyChange объекта NetMQClient ByteMessage. Теперь, когда ByteMessage.Message обновляется с ShimHandler, я могу десериализовать данные и что-то с ними сделать.

Пока это работает, но я действительно не уверен, правильно ли я делаю это .. ... и если это ThreadSafe

Не лучше ли было бы, чтобы у каждого класса в приложении Listening был свой NetMQClient для прослушивания определенной темы?

Действительно запутываете здесь, как использоватьвсе это, даже после прочтения всех примеров, представленных вокруг NetMQ.

Спасибо

...