Часть моего приложения состоит из основного потока и пары рабочих потоков, которые отправляют свои результаты в основной поток с помощью сокетов pyzmq
.Основной поток запускает tornado IOloop
и использует функции async
для чтения входящих данных на различных типах сокетов, созданных с использованием future.context
.
. Из соображений производительности я хотел бы использовать inproc
протокол.Однако inproc
работает только тогда, когда основной поток и рабочие потоки имеют одинаковый контекст.С другой стороны, для этого потребуется, чтобы каждый рабочий поток выполнял tornado IOloop
, что, на мой взгляд, является излишним для простых работников.
Минимальный пример, иллюстрирующий проблему:
import time
from threading import Thread
import zmq
from zmq.eventloop.future import Context as FutureContext
import tornado.ioloop
def worker(ctx):
socket = ctx.socket(zmq.PUSH)
socket.bind('inproc://worker')
while True:
# Do some work
time.sleep(2)
socket.send_pyobj("Work done")
async def mainLoop(ctx):
socket = ctx.socket(zmq.PULL)
socket.connect('inproc://worker')
while True:
#print(socket.recv_pyobj())
print(await socket.recv_pyobj())
normalCtx = zmq.Context()
futureCtx = FutureContext()
t = Thread(target=worker, kwargs=dict(ctx=normalCtx))
t.start()
# wait for bind to be effective
time.sleep(4)
io_loop = tornado.ioloop.IOLoop.current()
io_loop.spawn_callback(mainLoop, ctx=futureCtx)
io_loop.start()
В этом примере сокет PULL
не будет получать сообщения, поскольку он выдается в другом контексте, чем рабочий сокет.Пример работает нормально, если я использую normalCtx
в обоих потоках (удаление await
).Он также отлично работает при использовании TCP в качестве транспортного протокола.
Я нашел следующие решения, чтобы заставить его работать:
- Использовать контекст
normal
и отказаться от async/await
. - Используйте
future
контекст и запустите ioloop
на каждом работнике. - Используйте TCP в качестве транспортного протокола.
Мой вопрос: есть лиВолшебный трюк, чтобы заставить его работать с async/await
, inproc
и не запускать ioloops
на рабочих, например, путем доступа к контексту future
способом non-future
?