В моей программе на Python у меня есть поток, который получает сообщения от других потоков и передает их через PythonZMQ на удаленный компьютер и получает от него ответы.
Поскольку сокеты ZMQ не являются потокобезопасными , я хочу обрабатывать как отправку, так и получение сообщений в одном потоке.Отправляемые сообщения могут быть доставлены очередью , а о прибытии нового сообщения может быть сообщено с помощью условной переменной , поэтому я должен дождаться его, используя Condition.waitметод.Однако получение ответа следует ожидать в функции pyzmq poll .Проблема в том, что я не могу ждать одновременно в Condition.wait и pyzmq poll.
Я вижу один обходной путь - использование сокета ZMQ для передачи сообщений, которые будут отправлены из других потоков в поток, обрабатывающий удаленную связь.Однако это кажется излишним.Можно ли сделать это проще?
Пример кода (использование ZMQ вместо очереди)
Основной процесс, который генерирует сообщения, отправляет их процессоруи получает ответы:
import zmq
import time
import random
import threading
#Thread that delivers the messages
def msg_source(zctx):
#Prepare the socket for communication with the message receiver
s=zctx.socket(zmq.PAIR)
s.connect('inproc://src')
while True:
r=random.Random()
time.sleep(1+2*r.random())
print(".")
s.send("msg "+str(time.time()))
#Initialize the context
ctx=zmq.Context()
#Prepare the socket for communication with the message source
s1=ctx.socket(zmq.PAIR)
s1.bind('inproc://src')
#Prepare the socket for communucation with the message processor
s2=ctx.socket(zmq.PAIR)
s2.connect('tcp://0.0.0.0:8998')
t=threading.Thread(target=msg_source, args=(ctx,))
t.daemon = True
t.start()
p=zmq.Poller()
p.register(s1,flags=zmq.POLLIN)
p.register(s2,flags=zmq.POLLIN)
while True:
e=p.poll()
for tp in e:
if tp[0]==s1:
m=s1.recv()
s2.send(m)
print("s1:"+m)
if tp[0]==s2:
m=s2.recv()
print("s2:"+m)
«Процессор», который получает сообщения и отправляет ответы.