Подписчики получают сообщения медленно - PullRequest
1 голос
/ 26 сентября 2019

У меня есть издатель pyzmq, который отправляет около 1000 сообщений в секунду.Я пытаюсь запустить около 10 подписчиков в разных темах с помощью asyncio event_loop.

Он работает, но примерно в 10 раз медленнее, чем скорость только одного подписчика.

Что может быть не так с кодом?

import asyncio
import zmq
import json
from zmq.backend.cython.constants import NOBLOCK
from zmq.asyncio import Context, Poller
from loop_ import Loop


class Client:
    REQUEST_TIMEOUT = 35000
    SERVER_ENDPOINT = "tcp://localhost:6666"

    def __init__(self, id_, loop_):
        self.id = id_
        self.loop = loop_
        self.queue = asyncio.Queue(loop=self.loop.get())

    def run(self):
        self.loop.send_task(self.start)

    async def start(self):
        client_task = asyncio.create_task(self.client_coroutine())
        done, pending = await asyncio.wait({client_task}, return_when=asyncio.FIRST_COMPLETED)
        print(done, pending)

    async def client_coroutine(self):
        context = Context.instance()

        socket = context.socket(zmq.SUB)
        socket.connect(self.SERVER_ENDPOINT)
        socket.setsockopt(zmq.SUBSCRIBE, b'4')
        poller = Poller()
        poller.register(socket, zmq.POLLIN)

        while True:
            event = dict(await poller.poll(self.REQUEST_TIMEOUT))
            if event.get(socket) == zmq.POLLIN:
                reply = await socket.recv_multipart(flags=NOBLOCK)
                if not reply:
                    break
                else:
                    print(eval(json.loads(reply[1].decode('utf-8'))))
            else:
                print("No response from server, retrying...")
                socket.setsockopt(zmq.LINGER, 0)
                socket.close()
                poller.unregister(socket)

loop = Loop()
loop.run()
clients = [Client(id_, loop) for id_ in range(10)]

for c in clients:
    c.run()
input()

Модуль loop_ содержит:

import asyncio
from threading import Thread
from time import sleep
import structlog


class Loop:

    def __init__(self):
        self._loop = asyncio.new_event_loop()
        self._thread = Thread(target=self._run_loop, args=(self._loop,), daemon=True)
        self.logger = structlog.get_logger(__name__)

    @staticmethod
    def _run_loop(loop_):
        asyncio.set_event_loop(loop_)
        loop_.set_debug(False)
        loop_.run_forever()

    def run(self):
        self._thread.start()

    def stop_and_close(self):
        asyncio.set_event_loop(self._loop)
        self._loop.call_soon_threadsafe(self._loop.stop)
        while self.is_running():
            sleep(0.01)
        self._loop.close()
        self.logger.debug(f"{self._loop}, thread is alive: {self.thread_is_alive()}")

    def thread_is_alive(self):
        return self._thread.is_alive()

    def is_running(self):
        return self._loop.is_running()

    def is_closed(self):
        return self._loop.is_closed()

    def get(self):
        return self._loop

    def send_task(self, task_, *args):
        return asyncio.run_coroutine_threadsafe(task_(*args), self._loop)
...