В ZMQ REQ -> Router - Dealer-REP одна рабочая нить блокирует другую рабочую нить? - PullRequest
0 голосов
/ 11 марта 2020

enter image description here

Как на рисунке:

Клиент: имеет несколько сокетов REP в нескольких потоках;

Сервер: ROUTER -> DEALER -> REP (поток работы);

Действие: клиент отправляет запросы в несколько потоков, рабочий поток обрабатывает запрос и отправляет ответ клиенту. Но если один рабочий поток будет заблокирован, другой рабочий поток тоже будет заблокирован. Почему?

код, указанный ниже:

Сервер: рабочий поток


    void* worker_routine(void* context, string name) {
        void* socket_ = zmq_socket(context, ZMQ_REP);
        zmq_connect(socket_, "inproc://workers");

        while(true) {
            //  Wait for next request from client
            char msg[1024];
            zmq_recv(socket_, msg, 1024, 0);
            string reply;

            string request = msg;
            if(request == "client1:hello") {
                //Doing something 
                mSleep(5000);//cost 5s
                reply = "client1:hello world!!!";
            }

            //  Send reply back to client
            LOGD(TAG, "%s:worker send reply", name.c_str());
            zmq_send(socket_, reply.c_str(), reply.length(), 0);
        }
        return (NULL);
    }

Сервер: главный

    int main(int argc, char** argv) {
        (void) argc;
        (void) argv;
        LOGD(TAG, "start");

        void* context = zmq_ctx_new();
        void* frontend = zmq_socket(context, ZMQ_ROUTER);
        zmq_bind(frontend, "ipc://@server:5555");

        void* backend = zmq_socket(context, ZMQ_DEALER);
        zmq_bind(backend, "inproc://workers");

        for(size_t i = 0; i < 5; i++) {
            string name = "woker" + std::to_string(i);
            thread worker(worker_routine, context, name);
            worker.detach();
        }

        zmq_proxy(frontend ,backend ,nullptr);

        zmq_close(frontend);
        zmq_close(backend);
        zmq_ctx_destroy(context);

        LOGD(TAG, "end");

        return 0;
    }

Клиент:

    void* worker_routine(void* context, string name) {
        void* clients = zmq_socket(context, ZMQ_REQ);
        zmq_connect(clients, "ipc://@server:5555");

        while(true) {
            string request = name + ":hello";
            int ret = zmq_send(clients, request.c_str(), request.size(), 0);
            char reply[1024] = "";
            ret = zmq_recv(clients, reply, 1024, 0);
            mSleep(0, 50);
        }
        return (NULL);
    }

    int main(int argc, char** argv) {

        void* context = zmq_ctx_new();

        for(size_t i = 0; i < 10; i++) {
            string name = "client" + std::to_string(i);
            thread client(worker_routine, context, name);
            client.detach();
        }

        while(1) {
            mSleep(0, 20);
        }

        zmq_ctx_destroy(context);

        return 0;
    }
...