Eventloop.future контекст Pyzmq с 'inproc' в рабочих потоках - PullRequest
0 голосов
/ 15 октября 2018

Часть моего приложения состоит из основного потока и пары рабочих потоков, которые отправляют свои результаты в основной поток с помощью сокетов pyzmq.Основной поток запускает tornado IOloop и использует функции async для чтения входящих данных на различных типах сокетов, созданных с использованием future.context.

. Из соображений производительности я хотел бы использовать inproc протокол.Однако inproc работает только тогда, когда основной поток и рабочие потоки имеют одинаковый контекст.С другой стороны, для этого потребуется, чтобы каждый рабочий поток выполнял tornado IOloop, что, на мой взгляд, является излишним для простых работников.

Минимальный пример, иллюстрирующий проблему:

import time
from threading import Thread

import zmq
from zmq.eventloop.future import Context as FutureContext
import tornado.ioloop

def worker(ctx):
    socket = ctx.socket(zmq.PUSH)
    socket.bind('inproc://worker')

    while True:
        # Do some work
        time.sleep(2)
        socket.send_pyobj("Work done")

async def mainLoop(ctx):
    socket = ctx.socket(zmq.PULL)
    socket.connect('inproc://worker')

    while True:
        #print(socket.recv_pyobj())
        print(await socket.recv_pyobj()) 


normalCtx = zmq.Context()
futureCtx = FutureContext()

t = Thread(target=worker, kwargs=dict(ctx=normalCtx))
t.start()

# wait for bind to be effective
time.sleep(4)

io_loop = tornado.ioloop.IOLoop.current()
io_loop.spawn_callback(mainLoop, ctx=futureCtx)
io_loop.start()

В этом примере сокет PULL не будет получать сообщения, поскольку он выдается в другом контексте, чем рабочий сокет.Пример работает нормально, если я использую normalCtx в обоих потоках (удаление await).Он также отлично работает при использовании TCP в качестве транспортного протокола.

Я нашел следующие решения, чтобы заставить его работать:

  1. Использовать контекст normal и отказаться от async/await.
  2. Используйте future контекст и запустите ioloop на каждом работнике.
  3. Используйте TCP в качестве транспортного протокола.

Мой вопрос: есть лиВолшебный трюк, чтобы заставить его работать с async/await, inproc и не запускать ioloops на рабочих, например, путем доступа к контексту future способом non-future?

...