NetMQ Опрос сокета повторения с таймаутом в цикле - PullRequest
0 голосов
/ 29 апреля 2019

Я пытаюсь перенести свой код из устаревшей библиотеки CastleMQ в NetMQ, но у меня возникают некоторые проблемы.

Я предпочитаю использовать опрос с тайм-аутом для надежности - я только что обнаружил, что он работает лучше для меня методом проб и ошибок, чем простое блокирование порта на неопределенный срок.

вот мой код CastleMQ


    public int ZeroPort;

    private void ThreadProc()
    { 
        var ctx = new Context();
        try {
            using (var repSocket = ctx.CreateSocket(SocketType.Rep))
            {
                string bindAddress = "tcp://*:"+ZeroPort;
                repSocket.Bind(bindAddress);
                print2("*** BINDING on {0} ***", bindAddress);

                bool quit = false;
                while (!quit) {
                    try {
                        var polling = new Polling(PollingEvents.RecvReady, repSocket);
                        polling.RecvReady += (socket) =>
                        { // using socket.Recv() here is guaranted to return stuff
                            var msg = socket.Recv();
                            var msgStr = Encoding.UTF8.GetString(msg);
                            print2("[REP:{0}] {1}", bindAddress, msgStr);

                            switch (msgStr) {
                                case "positions": {
                                    StringBuilder csv = new StringBuilder();
                                    print2("csv: {0}", csv.ToString());
                                    socket.Send(csv.ToString());
                                    break;
                                }

                                default: {
                                    socket.Send("Unrecognized Command: " + msgStr);
                                    break;
                                }
                            }
                        };

                        polling.Poll(POLL_TIMEOUT_MS); // this returns once some socket event happens

                    } catch (Exception e) {
                        if (e is ThreadAbortException) {
                            quit = true;
                            print2("\n*** EXITED ***");
                        } else print2(e.ToString());
                    }
                }
            }

        } catch (Exception e) {
            print2(e.ToString());
        } finally {
            ctx.Dispose();
        }
    }    

вот что я пытался сделать, а затем потерял с NetMQ

    private void ThreadProc()
    {   
        try {
            string bindAddress = "@tcp://*:" + ZeroPort;
            print2("*** BINDING on {0} ***", bindAddress);
            using (var repSocket = new ResponseSocket(bindAddress))
            using (var poller = new NetMQPoller { repSocket })
            {
                //                    bool quit = false;
                //                  while (!quit)

                // these event will be raised by the Poller
                repSocket.ReceiveReady += (s, a) =>
                {
                    // receive won't block as a message is ready
                    string msg = a.Socket.ReceiveString(); // defeinition for ReceiveString() can't be found
                    // send a response
                    a.Socket.Send("Response"); // it doesn't like "Response", do I need to wrap it in some object?

Меня особенно смущает, как добавить тайм-аут, чтобы я мог опрашивать тайм-аут в цикле, как это делает мой код CastleMQ.

Любые указатели будут высоко оценены, спасибо

...