У меня есть издатель pyzmq, который отправляет около 1000 сообщений в секунду.Я пытаюсь запустить около 10 подписчиков в разных темах с помощью asyncio event_loop.
Он работает, но примерно в 10 раз медленнее, чем скорость только одного подписчика.
Что может быть не так с кодом?
import asyncio
import zmq
import json
from zmq.backend.cython.constants import NOBLOCK
from zmq.asyncio import Context, Poller
from loop_ import Loop
class Client:
REQUEST_TIMEOUT = 35000
SERVER_ENDPOINT = "tcp://localhost:6666"
def __init__(self, id_, loop_):
self.id = id_
self.loop = loop_
self.queue = asyncio.Queue(loop=self.loop.get())
def run(self):
self.loop.send_task(self.start)
async def start(self):
client_task = asyncio.create_task(self.client_coroutine())
done, pending = await asyncio.wait({client_task}, return_when=asyncio.FIRST_COMPLETED)
print(done, pending)
async def client_coroutine(self):
context = Context.instance()
socket = context.socket(zmq.SUB)
socket.connect(self.SERVER_ENDPOINT)
socket.setsockopt(zmq.SUBSCRIBE, b'4')
poller = Poller()
poller.register(socket, zmq.POLLIN)
while True:
event = dict(await poller.poll(self.REQUEST_TIMEOUT))
if event.get(socket) == zmq.POLLIN:
reply = await socket.recv_multipart(flags=NOBLOCK)
if not reply:
break
else:
print(eval(json.loads(reply[1].decode('utf-8'))))
else:
print("No response from server, retrying...")
socket.setsockopt(zmq.LINGER, 0)
socket.close()
poller.unregister(socket)
loop = Loop()
loop.run()
clients = [Client(id_, loop) for id_ in range(10)]
for c in clients:
c.run()
input()
Модуль loop_ содержит:
import asyncio
from threading import Thread
from time import sleep
import structlog
class Loop:
def __init__(self):
self._loop = asyncio.new_event_loop()
self._thread = Thread(target=self._run_loop, args=(self._loop,), daemon=True)
self.logger = structlog.get_logger(__name__)
@staticmethod
def _run_loop(loop_):
asyncio.set_event_loop(loop_)
loop_.set_debug(False)
loop_.run_forever()
def run(self):
self._thread.start()
def stop_and_close(self):
asyncio.set_event_loop(self._loop)
self._loop.call_soon_threadsafe(self._loop.stop)
while self.is_running():
sleep(0.01)
self._loop.close()
self.logger.debug(f"{self._loop}, thread is alive: {self.thread_is_alive()}")
def thread_is_alive(self):
return self._thread.is_alive()
def is_running(self):
return self._loop.is_running()
def is_closed(self):
return self._loop.is_closed()
def get(self):
return self._loop
def send_task(self, task_, *args):
return asyncio.run_coroutine_threadsafe(task_(*args), self._loop)