ZMQ Poller не работает в экземпляре класса - PullRequest
2 голосов
/ 19 марта 2012

Привет, у меня возникли некоторые проблемы с упаковкой некоторых клиентов ZMQ в классах Python.Эти классы создаются и вызываются в подпроцессе через многопроцессорный модуль.Когда клиенты являются функциями, все работают, но когда они являются классами, poller.poll () зависает.

Код ниже имеет обе версии: одна работает, другая нет.Почему?

import zmq
import time
import sys
import random
from  multiprocessing import Process

def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:%s" % port)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            break
        time.sleep (1) 

def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(1)    


class Client:
    def __init__(self,port_push, port_sub):
        context = zmq.Context()
        self.socket_pull = context.socket(zmq.PULL)
        self.socket_pull.connect ("tcp://localhost:%s" % port_push)
        print "Connected to server with port %s" % port_push
        self.socket_sub = context.socket(zmq.SUB)
        self.socket_sub.connect ("tcp://localhost:%s" % port_sub)
        self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
        print "Connected to publisher with port %s" % port_sub
        # Initialize poll set


    def __call__(self):
        poller = zmq.Poller()
        poller.register(self.socket_pull, zmq.POLLIN)
        poller.register(self.socket_sub, zmq.POLLIN)
        # Work on requests from both server and publisher
        should_continue = True
        print "listening"
        while should_continue:
            print "hello"
            socks = dict(poller.poll())
            print poller
            if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN:
                message = self.socket_pull.recv()
                print "Recieved control command: %s" % message
                if message == "Exit": 
                    print "Recieved exit command, client will stop recieving messages"
                    should_continue = False

                if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN:
                    string = self.socket_sub.recv()
                    topic, messagedata = string.split()
                    print "Processing ... ", topic, messagedata

def client(port_push, port_sub):
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://localhost:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://localhost:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll())
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    #~ Process(target=client,args=(server_push_port,server_pub_port)).start()
    Process(target=Client(server_push_port,server_pub_port)).start()

1 Ответ

2 голосов
/ 19 марта 2012

Edit1: это не совсем правильно ... дай мне несколько минут, чтобы понять это правильно ...

Я думаю, что вы, возможно, вызываете класс Client неправильно.Я не эксперт в этом, но я думаю, что ваш клиент должен быть разделен на подклассы от Process, а затем запускаться с использованием функции .start ().Итак, определите ваш класс Client следующим образом:

class Client(Process):
    def __init__(self, port_push, port_sub):
        (...) # your class init code here...make sure indentation is correct

Затем в конце, где вы запускаете серверы, создайте экземпляр вашего класса Client и запустите его так:

client_class = Client(port_push, port_sub)
client_class.start()

Edit2: вот отредактированная версия кода fccoelho, которая работает для меня.

Самая большая проблема заключается в том, что инициализацию ZMQ нужно выполнять в методе __call__, а не в__init__.Я подозреваю, что это связано с тем, как память распределяется в многопроцессорном режиме: функция __init__ будет выполняться в родительском процессе, а функция __call__ - в дочернем процессе с отдельным пространством памяти.Видимо ZMQ это не нравится.Я также добавил несколько периодов ожидания, чтобы клиент не мог подключиться к серверу до его готовности и чтобы сервер не отправлял сообщения до того, как клиент подписался.Также использую 127.0.0.1 вместо localhost (мой компьютер почему-то не любит localhost).Также были удалены надоедливые сообщения печати вокруг вызова опроса в клиенте и исправлена ​​проблема с отступом, когда клиент проверяет результаты опроса на сокете pubsub.

import zmq
import time
import sys
import random
from  multiprocessing import Process

def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://127.0.0.1:%s" % port)
    print "Running server on port: ", port
    time.sleep(1.0)
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            print 'Push server sent "Exit" signal'
            break
        time.sleep(0.4) 

def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://127.0.0.1:%s" % port)
    socket.setsockopt(zmq.HWM, 1000)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    time.sleep(1.0)
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(0.4)    


class Client:
    def __init__(self,port_push, port_sub):
        self.port_push = port_push
        self.port_sub = port_sub
        # Initialize poll set

    def __call__(self):
        time.sleep(0.5)
        print 'hello from class client!'
        context = zmq.Context()
        self.socket_pull = context.socket(zmq.PULL)
        self.socket_pull.connect ("tcp://127.0.0.1:%s" % self.port_push)
        print "Connected to server with port %s" % self.port_push
        self.socket_sub = context.socket(zmq.SUB)
        self.socket_sub.connect ("tcp://127.0.0.1:%s" % self.port_sub)
        self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
        print "Connected to publisher with port %s" % self.port_sub

        poller = zmq.Poller()
        poller.register(self.socket_pull, zmq.POLLIN)
        poller.register(self.socket_sub, zmq.POLLIN)
        # Work on requests from both server and publisher
        should_continue = True
        print "listening"
        while should_continue:
            # print "hello"
            socks = dict(poller.poll())
            # print poller
            if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN:
                message = self.socket_pull.recv()
                print "Recieved control command: %s" % message
                if message == "Exit": 
                    print "Recieved exit command, client will stop recieving messages"
                    should_continue = False

            if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN:
                string = self.socket_sub.recv()
                topic, messagedata = string.split()
                print "Processing ... ", topic, messagedata

def client(port_push, port_sub):
    print 'hello from function client!'
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://127.0.0.1:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://127.0.0.1:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll(1000))
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    # Process(target=client,args=(server_push_port,server_pub_port)).start()
    Process(target=Client(server_push_port,server_pub_port)).start()

Наконец, здесь более чистая реализация многопроцессорной обработки.pubsub, который очень прост, но демонстрирует вещи более наглядно:

import zmq
from multiprocessing import Process
import time

class ServerPubSub(Process):
    def __init__(self, port, n):
        Process.__init__(self)
        self.port = port
        self.n = n

    def run(self):
        self.context = zmq.Context()
        self.pub = self.context.socket(zmq.PUB)
        self.pub.bind('tcp://127.0.0.1:%d' % self.port)
        self.pub.setsockopt(zmq.HWM, 1000)

        time.sleep(1)

        end = False
        for i in range(self.n):
            print 'SRV: sending message %d' % i
            self.pub.send('Message %d' % i)
            print 'SRV: message %d sent' % i
            time.sleep(0.2)

        self.pub.close()

class ClientPubSub(Process):
    def __init__(self, port, n):
        Process.__init__(self)
        self.port = port
        self.n = n

    def run(self):
        self.context = zmq.Context()
        self.sub = self.context.socket(zmq.SUB)
        self.sub.connect('tcp://127.0.0.1:%d' % self.port)
        self.sub.setsockopt(zmq.SUBSCRIBE, '')
        self.poller = zmq.Poller()
        self.poller.register(self.sub, zmq.POLLIN)

        end = False
        count = 0
        while count < self.n:
            ready = dict(self.poller.poll(0))
            if self.sub in ready and ready[self.sub] == zmq.POLLIN:
                msg = self.sub.recv()
                print 'CLI: received message "%s"' % msg
                count += 1

        self.sub.close()

if __name__ == "__main__":
    port = 5000
    n = 10
    server = ServerPubSub(port, n)
    client = ClientPubSub(port, n)

    server.start()
    client.start()

    server.join()
    client.join()
...