Я работаю над "серверным" потоком, который обрабатывает некоторые вызовы ввода-вывода для группы "клиентов".
Связь осуществляется с использованием pynng v0.5.0 сервер имеет свою собственную asyncio l oop.
Каждый клиент «регистрируется», отправляя первый запрос, а затем повторяет цикл получения результатов и отправки сообщений READY.
На сервере цель состоит в том, чтобы обрабатывать первое сообщение каждого клиента как запрос на регистрацию и создавать выделенную рабочую задачу, которая будет oop выполнять ввод-вывод, отправлять результат и ждать сообщения ГОТОВ этот конкретный клиент.
Чтобы реализовать это, я пытаюсь использовать функцию Context сокетов REP0.
Примечания
Мне бы хотелось отметить этот вопрос как nng и pynng , но у меня недостаточно репутации.
Хотя я заядлый потребитель этого сайта, он это мой первый вопрос:)
Я действительно знаю о паттерне PUB / SUB, давайте просто скажем, что в целях самообучения я решил не использовать это для этой услуги.
Проблема:
После нескольких итераций некоторые сообщения READY перехватываются сопрограммой регистрации сервер, вместо того, чтобы перенаправлять на правильную рабочую задачу.
Поскольку я не могу поделиться кодом, я написал репродуктор для своей проблемы и включил его ниже.
Хуже, как вы можете смотрите в выводе, что некоторые сообщения о результатах отправляются не тому клиенту (ERROR:root:<Worker 1>: worker/client mismatch, exiting.
).
Это похоже на ошибку, но я не совсем уверен, что понимаю, как правильно использовать контексты, поэтому любой приветствуется помощь.
Среда:
Код:
import asyncio
import logging
import pynng
import threading
NNG_DURATION_INFINITE = -1
ENDPOINT = 'inproc://example_endpoint'
class Server(threading.Thread):
def __init__(self):
super(Server, self).__init__()
self._client_tasks = dict()
@staticmethod
async def _worker(ctx, client_id):
while True:
# Remember, the first 'receive' has already been done by self._new_client_handler()
logging.debug(f"<Worker {client_id}>: doing some IO")
await asyncio.sleep(1)
logging.debug(f"<Worker {client_id}>: sending the result")
# I already tried sending synchronously here instead, just in case the issue was related to that
# (but it's not)
await ctx.asend(f"result data for client {client_id}".encode())
logging.debug(f"<Worker {client_id}>: waiting for client READY msg")
data = await ctx.arecv()
logging.debug(f"<Worker {client_id}>: received '{data}'")
if data != bytes([client_id]):
logging.error(f"<Worker {client_id}>: worker/client mismatch, exiting.")
return
async def _new_client_handler(self):
with pynng.Rep0(listen=ENDPOINT) as socket:
max_workers = 3 + 1 # Try setting it to 3 instead, to stop creating new contexts => now it works fine
while await asyncio.sleep(0, result=True) and len(self._client_tasks) < max_workers:
# The issue is here: at some point, the existing client READY messages get
# intercepted here, instead of being routed to the proper worker context.
# The intent here was to open a new context only for each *new* client, I was
# assuming that a 'recv' on older worker contexts would take precedence.
ctx = socket.new_context()
data = await ctx.arecv()
client_id = data[0]
if client_id in self._client_tasks:
logging.error(f"<Server>: We already have a task for client {client_id}")
continue # just let the client block on its 'recv' for now
logging.debug(f"<Server>: New client : {client_id}")
self._client_tasks[client_id] = asyncio.create_task(self._worker(ctx, client_id))
await asyncio.gather(*list(self._client_tasks.values()))
def run(self) -> None:
# The "server" thread has its own asyncio loop
asyncio.run(self._new_client_handler(), debug=True)
class Client(threading.Thread):
def __init__(self, client_id: int):
super(Client, self).__init__()
self._id = client_id
def __repr__(self):
return f'<Client {self._id}>'
def run(self):
with pynng.Req0(dial=ENDPOINT, resend_time=NNG_DURATION_INFINITE) as socket:
while True:
logging.debug(f"{self}: READY")
socket.send(bytes([self._id]))
data_str = socket.recv().decode()
logging.debug(f"{self}: received '{data_str}'")
if data_str != f"result data for client {self._id}":
logging.error(f"{self}: client/worker mismatch, exiting.")
return
def main():
logging.basicConfig(level=logging.DEBUG)
threads = [Server(),
*[Client(i) for i in range(3)]]
for t in threads:
t.start()
for t in threads:
t.join()
if __name__ == '__main__':
main()
Выход: * 1 081 *
DEBUG:asyncio:Using proactor: IocpProactor
DEBUG:root:<Client 1>: READY
DEBUG:root:<Client 0>: READY
DEBUG:root:<Client 2>: READY
DEBUG:root:<Server>: New client : 1
DEBUG:root:<Worker 1>: doing some IO
DEBUG:root:<Server>: New client : 0
DEBUG:root:<Worker 0>: doing some IO
DEBUG:root:<Server>: New client : 2
DEBUG:root:<Worker 2>: doing some IO
DEBUG:root:<Worker 1>: sending the result
DEBUG:root:<Client 1>: received 'result data for client 1'
DEBUG:root:<Client 1>: READY
ERROR:root:<Server>: We already have a task for client 1
DEBUG:root:<Worker 1>: waiting for client READY msg
DEBUG:root:<Worker 0>: sending the result
DEBUG:root:<Client 0>: received 'result data for client 0'
DEBUG:root:<Client 0>: READY
DEBUG:root:<Worker 0>: waiting for client READY msg
DEBUG:root:<Worker 1>: received 'b'\x00''
ERROR:root:<Worker 1>: worker/client mismatch, exiting.
DEBUG:root:<Worker 2>: sending the result
DEBUG:root:<Client 2>: received 'result data for client 2'
DEBUG:root:<Client 2>: READY
DEBUG:root:<Worker 2>: waiting for client READY msg
ERROR:root:<Server>: We already have a task for client 2
Редактировать (2020-04-10): обновил и pynng, и базовый nng.lib до последней версии (основные ветви), но проблема остается той же.