Python - как ждать в том же потоке условной переменной и / или события ввода / вывода? - PullRequest
0 голосов
/ 20 октября 2018

В моей программе на Python у меня есть поток, который получает сообщения от других потоков и передает их через PythonZMQ на удаленный компьютер и получает от него ответы.

Поскольку сокеты ZMQ не являются потокобезопасными , я хочу обрабатывать как отправку, так и получение сообщений в одном потоке.Отправляемые сообщения могут быть доставлены очередью , а о прибытии нового сообщения может быть сообщено с помощью условной переменной , поэтому я должен дождаться его, используя Condition.waitметод.Однако получение ответа следует ожидать в функции pyzmq poll .Проблема в том, что я не могу ждать одновременно в Condition.wait и pyzmq poll.

Я вижу один обходной путь - использование сокета ZMQ для передачи сообщений, которые будут отправлены из других потоков в поток, обрабатывающий удаленную связь.Однако это кажется излишним.Можно ли сделать это проще?

Пример кода (использование ZMQ вместо очереди)

Основной процесс, который генерирует сообщения, отправляет их процессоруи получает ответы:

import zmq
import time
import random
import threading

#Thread that delivers the messages
def msg_source(zctx):
  #Prepare the socket for communication with the message receiver
  s=zctx.socket(zmq.PAIR)
  s.connect('inproc://src')
  while True:
    r=random.Random()
    time.sleep(1+2*r.random())
    print(".")
    s.send("msg "+str(time.time()))

#Initialize the context
ctx=zmq.Context()
#Prepare the socket for communication with the message source
s1=ctx.socket(zmq.PAIR)
s1.bind('inproc://src')
#Prepare the socket for communucation with the message processor
s2=ctx.socket(zmq.PAIR)
s2.connect('tcp://0.0.0.0:8998')
t=threading.Thread(target=msg_source, args=(ctx,))
t.daemon = True
t.start()
p=zmq.Poller()
p.register(s1,flags=zmq.POLLIN)
p.register(s2,flags=zmq.POLLIN)
while True:
  e=p.poll()
  for tp in e:
    if tp[0]==s1:
      m=s1.recv()
      s2.send(m)
      print("s1:"+m)
    if tp[0]==s2:
      m=s2.recv()
      print("s2:"+m)

«Процессор», который получает сообщения и отправляет ответы.

...