pynng: как настроить и продолжать использовать несколько контекстов в сокете REP0 - PullRequest
2 голосов
/ 08 апреля 2020

Я работаю над "серверным" потоком, который обрабатывает некоторые вызовы ввода-вывода для группы "клиентов".

Связь осуществляется с использованием pynng v0.5.0 сервер имеет свою собственную asyncio l oop.

Каждый клиент «регистрируется», отправляя первый запрос, а затем повторяет цикл получения результатов и отправки сообщений READY.

На сервере цель состоит в том, чтобы обрабатывать первое сообщение каждого клиента как запрос на регистрацию и создавать выделенную рабочую задачу, которая будет oop выполнять ввод-вывод, отправлять результат и ждать сообщения ГОТОВ этот конкретный клиент.

Чтобы реализовать это, я пытаюсь использовать функцию Context сокетов REP0.

Примечания

  • Мне бы хотелось отметить этот вопрос как и , но у меня недостаточно репутации.

  • Хотя я заядлый потребитель этого сайта, он это мой первый вопрос:)

  • Я действительно знаю о паттерне PUB / SUB, давайте просто скажем, что в целях самообучения я решил не использовать это для этой услуги.

Проблема:

После нескольких итераций некоторые сообщения READY перехватываются сопрограммой регистрации сервер, вместо того, чтобы перенаправлять на правильную рабочую задачу.

Поскольку я не могу поделиться кодом, я написал репродуктор для своей проблемы и включил его ниже.

Хуже, как вы можете смотрите в выводе, что некоторые сообщения о результатах отправляются не тому клиенту (ERROR:root:<Worker 1>: worker/client mismatch, exiting.).

Это похоже на ошибку, но я не совсем уверен, что понимаю, как правильно использовать контексты, поэтому любой приветствуется помощь.

Среда:

Код:

import asyncio
import logging
import pynng
import threading

NNG_DURATION_INFINITE = -1
ENDPOINT = 'inproc://example_endpoint'


class Server(threading.Thread):
    def __init__(self):
        super(Server, self).__init__()
        self._client_tasks = dict()

    @staticmethod
    async def _worker(ctx, client_id):
        while True:
            # Remember, the first 'receive' has already been done by self._new_client_handler()

            logging.debug(f"<Worker {client_id}>: doing some IO")
            await asyncio.sleep(1)

            logging.debug(f"<Worker {client_id}>: sending the result")
            # I already tried sending synchronously here instead, just in case the issue was related to that
            # (but it's not)
            await ctx.asend(f"result data for client {client_id}".encode())

            logging.debug(f"<Worker {client_id}>: waiting for client READY msg")
            data = await ctx.arecv()
            logging.debug(f"<Worker {client_id}>: received '{data}'")
            if data != bytes([client_id]):
                logging.error(f"<Worker {client_id}>: worker/client mismatch, exiting.")
                return

    async def _new_client_handler(self):
        with pynng.Rep0(listen=ENDPOINT) as socket:
            max_workers = 3 + 1  # Try setting it to 3 instead, to stop creating new contexts => now it works fine
            while await asyncio.sleep(0, result=True) and len(self._client_tasks) < max_workers:
                # The issue is here: at some point, the existing client READY messages get
                # intercepted here, instead of being routed to the proper worker context.
                # The intent here was to open a new context only for each *new* client, I was
                # assuming that a 'recv' on older worker contexts would take precedence.
                ctx = socket.new_context()
                data = await ctx.arecv()
                client_id = data[0]

                if client_id in self._client_tasks:
                    logging.error(f"<Server>: We already have a task for client {client_id}")
                    continue  # just let the client block on its 'recv' for now

                logging.debug(f"<Server>: New client : {client_id}")
                self._client_tasks[client_id] = asyncio.create_task(self._worker(ctx, client_id))

            await asyncio.gather(*list(self._client_tasks.values()))

    def run(self) -> None:
        # The "server" thread has its own asyncio loop
        asyncio.run(self._new_client_handler(), debug=True)


class Client(threading.Thread):
    def __init__(self, client_id: int):
        super(Client, self).__init__()
        self._id = client_id

    def __repr__(self):
        return f'<Client {self._id}>'

    def run(self):
        with pynng.Req0(dial=ENDPOINT, resend_time=NNG_DURATION_INFINITE) as socket:
            while True:
                logging.debug(f"{self}: READY")
                socket.send(bytes([self._id]))
                data_str = socket.recv().decode()
                logging.debug(f"{self}: received '{data_str}'")
                if data_str != f"result data for client {self._id}":
                    logging.error(f"{self}: client/worker mismatch, exiting.")
                    return


def main():
    logging.basicConfig(level=logging.DEBUG)
    threads = [Server(),
               *[Client(i) for i in range(3)]]
    for t in threads:
        t.start()
    for t in threads:
        t.join()


if __name__ == '__main__':
    main()

Выход: * 1 081 *

DEBUG:asyncio:Using proactor: IocpProactor
DEBUG:root:<Client 1>: READY
DEBUG:root:<Client 0>: READY
DEBUG:root:<Client 2>: READY
DEBUG:root:<Server>: New client : 1
DEBUG:root:<Worker 1>: doing some IO
DEBUG:root:<Server>: New client : 0
DEBUG:root:<Worker 0>: doing some IO
DEBUG:root:<Server>: New client : 2
DEBUG:root:<Worker 2>: doing some IO
DEBUG:root:<Worker 1>: sending the result
DEBUG:root:<Client 1>: received 'result data for client 1'
DEBUG:root:<Client 1>: READY
ERROR:root:<Server>: We already have a task for client 1
DEBUG:root:<Worker 1>: waiting for client READY msg
DEBUG:root:<Worker 0>: sending the result
DEBUG:root:<Client 0>: received 'result data for client 0'
DEBUG:root:<Client 0>: READY
DEBUG:root:<Worker 0>: waiting for client READY msg
DEBUG:root:<Worker 1>: received 'b'\x00''
ERROR:root:<Worker 1>: worker/client mismatch, exiting.
DEBUG:root:<Worker 2>: sending the result
DEBUG:root:<Client 2>: received 'result data for client 2'
DEBUG:root:<Client 2>: READY
DEBUG:root:<Worker 2>: waiting for client READY msg
ERROR:root:<Server>: We already have a task for client 2

Редактировать (2020-04-10): обновил и pynng, и базовый nng.lib до последней версии (основные ветви), но проблема остается той же.

1 Ответ

1 голос
/ 18 апреля 2020

После изучения источников nng и pynng и подтверждения моего понимания с сопровождающими, я теперь могу ответить на свой собственный вопрос.

При использовании контекста в сокете REP0 есть несколько вещей чтобы быть в курсе.

Как заявлено, send / asend () гарантированно будет перенаправлено на тот же узел, с которого вы последний раз получили.

Данные от следующего recv / arecv () в этом же контексте, однако, НЕ гарантированно поступает от одного и того же партнера.

На самом деле, базовый вызов nng к rep0_ctx_recv() просто читает следующий сокет канал с доступными данными, поэтому нет никакой гарантии, что данные поступают с того же узла, что и последняя пара recv / send.

В приведенном выше репозитории я одновременно вызывал arecv () как в новом контексте (в сопрограмме Server._new_client_handler()), так и в каждом рабочем контексте (в сопрограмме Server._worker()).

Итак, то, что я ранее описал как следующий запрос, «перехватывается» главный коро время было просто условием гонки.

Одним из решений было бы получение только от сопрограммы Server._new_client_handler(), и чтобы работники обрабатывали только один запрос. Обратите внимание, что в этом случае работники больше не привязаны к конкретному сверстнику. Если это необходимо, маршрутизация входящих запросов должна обрабатываться на уровне приложения.

class Server(threading.Thread):
    @staticmethod
    async def _worker(ctx, data: bytes):
        client_id = int.from_bytes(data, byteorder='big', signed=False)
        logging.debug(f"<Worker {client_id}>: doing some IO")
        await asyncio.sleep(1 + 10 * random.random())

        logging.debug(f"<Worker {client_id}>: sending the result")
        await ctx.asend(f"result data for client {client_id}".encode())

    async def _new_client_handler(self):
        with pynng.Rep0(listen=ENDPOINT) as socket:
            while await asyncio.sleep(0, result=True):
                ctx = socket.new_context()
                data = await ctx.arecv()
                asyncio.create_task(self._worker(ctx, data))

    def run(self) -> None:
        # The "server" thread has its own asyncio loop
        asyncio.run(self._new_client_handler(), debug=False)
...