Дочерний процесс не получает сообщений при использовании ZMQ и Tornado - PullRequest
0 голосов
/ 09 ноября 2019

Я пытался запустить клиент, который имеет соединение веб-сокета с сервером веб-сокета, который отвечает на команды, выданные клиентом. Клиент принимает команды из stdin и отправляет другому процессу netclient, который отправляет эти команды дальше на сервер websocket. Сервер websocket является сервером Tornado. Клиент разговаривает с netclient с zmq. В конечном итоге этот клиент будет либо urwid клиентом (TUI), и / или PySide клиентом (GUI), поэтому я хотел бы повторно использовать тот же код netclient.

Моим первоначальным решением было запустить netclient в отдельном потоке для правильной инкапсуляции. Это работает, но когда я резко закрываю клиент с помощью Ctrl+C, я получаю ошибку Exception ignored in: <module 'threading'. Я хотел устранить ошибку, поэтому вместо этого переключился на multiprocessing.Process, прочитав, что Ctrl+C является особенно сложной задачей для потоков. После переключения на multiprocessing.Process код перестал работать и выдал следующую загадочную ошибку:

Traceback (most recent call last):
  File "/home/inspironxps/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/inspironxps/Downloads/netclient.py", line 104, in run
    self.ioloop.current().start()
  File "/home/inspironxps/.local/lib/python3.7/site-packages/tornado/platform/asyncio.py", line 148, in start
    self.asyncio_loop.run_forever()
  File "/home/inspironxps/.pyenv/versions/3.7.4/lib/python3.7/asyncio/base_events.py", line 534, in run_forever
    self._run_once()
  File "/home/inspironxps/.pyenv/versions/3.7.4/lib/python3.7/asyncio/base_events.py", line 1771, in _run_once
    handle._run()
  File "/home/inspironxps/.pyenv/versions/3.7.4/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/home/inspironxps/.local/lib/python3.7/site-packages/tornado/platform/asyncio.py", line 138, in _handle_events
    handler_func(fileobj, events)
  File "/home/inspironxps/.local/lib/python3.7/site-packages/zmq/eventloop/zmqstream.py", line 452, in _handle_events
    zmq_events = self.socket.EVENTS
  File "/home/inspironxps/.local/lib/python3.7/site-packages/zmq/sugar/attrsettr.py", line 48, in __getattr__
    return self._get_attr_opt(upper_key, opt)
  File "/home/inspironxps/.local/lib/python3.7/site-packages/zmq/sugar/attrsettr.py", line 52, in _get_attr_opt
    return self.get(opt)
  File "zmq/backend/cython/socket.pyx", line 496, in zmq.backend.cython.socket.Socket.get
  File "zmq/backend/cython/socket.pyx", line 262, in zmq.backend.cython.socket._getsockopt
  File "zmq/backend/cython/checkrc.pxd", line 15, in zmq.backend.cython.checkrc._check_rc
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 963, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 906, in _find_spec
  File "<frozen importlib._bootstrap_external>", line 1280, in find_spec
  File "<frozen importlib._bootstrap_external>", line 1252, in _get_spec
  File "<frozen importlib._bootstrap_external>", line 1391, in find_spec

Я планирую использовать аналогичную конфигурацию на сервере, чтобы ускорить несколько процессов / потоков, где каждый из этих процессовне делает то же самое. Итак, мой вопрос как мне безопасно завершить работу клиента или еще лучше, как лучше подойти к решению вышеуказанной проблемы ? Я использую Tornado 6.0.2, PyZMQ 18.1.0 и Kubuntu 18.04.2 LTS.

Код netclient

# netclient.py
import zmq
import logging
import asyncio

from ast import literal_eval
from multiprocessing import Process
# from threading import Thread
from zmq.eventloop.zmqstream import ZMQStream
from tornado import ioloop, websocket
# from serverenums import GameStatus

# from tornado.platform.asyncio import AnyThreadEventLoopPolicy
# asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
asyncio.set_event_loop(asyncio.new_event_loop())

class NetClient(Process):
    def __init__(self, port, server_url, **kwargs):
        super(NetClient, self).__init__()
        self.logger = logging.getLogger(__name__)
        self.wsconn = None
        self.wsconn_close = False
        self.has_wsconn_initialised = False
        # self.has_sent_wsconn_initialised_msg = False
        self.forced_exit = False
        self.game_url = server_url
        self.ctx = zmq.Context.instance()
        self.socket = self.ctx.socket(zmq.PAIR)
        # do not use localhost, because it would complain
        # with error [zmq.error.ZMQError: No such device]
        # self.socket.bind('inproc://uisocket')
        self.socket.bind('tcp://127.0.0.1:{}'.format(port))
        self.stream = ZMQStream(self.socket)
        self.stream.on_recv(self.communicate_with_server)
        self.logger.debug(f'Running on a port: {str(port)}')
        self.ioloop = ioloop.IOLoop()

    async def read_with_websocket(self):
        if self.wsconn_close == True:
            self.wsconn.close()
            self.socket.send_pyobj(dict(
                cmd='ENDED'
            ))
            return
        msg_recv = await self.wsconn.read_message()
        self.logger.debug(f'Received from websocket={msg_recv}')
        if msg_recv is None:
            self.wsconn.close()
            self.socket.send_pyobj(dict(
                cmd='ENDED'
            ))
        msg = literal_eval(msg_recv)
        self.logger.debug('Received from websocket(Decoded)={}'.format(str(msg)))
        return msg

    async def send_with_websocket(self, msg):
        if msg:
            if isinstance(self.wsconn,
                          websocket.WebSocketClientConnection):
                self.logger.debug('Sending game request')
                # Cannot send a dict(somehow)
                await self.wsconn.write_message(str(msg))
            else:
                self.forced_exit = True
                raise RuntimeError("Websocket connection closed")
        else:
            self.wsconn_close = True

    async def init_game_conn(self):
        self.logger.debug('Creating initial game server connection')
        try:
            self.wsconn = await websocket.websocket_connect(self.game_url)
        except Exception as err:
            print("Connection error: {}".format(err))
        else:
            self.logger.debug('Connection established')
            # if self.wsconn and not self.has_wsconn_initialised:
            #     self.has_wsconn_initialised = True
            #     self.socket.send_pyobj(dict(cmd='STARTED', msg='Connection established'))
            self.has_wsconn_initialised = True

    async def communicate_with_server(self):
        while True:
            msg_recv = self.socket.recv_pyobj()
            self.logger.debug(f'Received from playerui={str(msg_recv)}')
            if msg_recv.get('cmd') == 'ENDED' and not self.forced_exit:
                self.cleanup()
            elif not self.has_wsconn_initialised:
                await self.init_game_conn()
            else:
                await self.send_with_websocket(msg_recv)
                msg_recv = await self.read_with_websocket()
                self.socket.send_pyobj(msg_recv)

    def cleanup(self):
        self.forced_exit = True
        self.socket.close()
        self.ctx.term()
        self.logger.debug('Closing the netclient')
        self.ioloop.stop()

    def run(self):
        self.logger.debug(f'Running on ioloop = ${str(self.ioloop)}')
        self.ioloop.current().spawn_callback(self.communicate_with_server)
        self.ioloop.current().start()

Код клиента

# client.py 
import zmq
import sys
import logging
import uuid

from tornado import ioloop
# from serverenums import GameStatus
from netclient import NetClient

rootlogger = logging.getLogger()
rootlogger.setLevel(logging.DEBUG)

stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))

file_handler = logging.handlers.RotatingFileHandler(f'cmdui_{str(uuid.uuid4())}.log', maxBytes=(1048576*5), backupCount=7)
file_handler.setLevel(logging.DEBUG)
f_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(f_format)

rootlogger.addHandler(stream_handler)
rootlogger.addHandler(file_handler)


def get_random_port():
    import socket
    s = socket.socket()
    s.bind(("", 0))
    port_num = s.getsockname()[1]
    s.close()
    return port_num

class CmdUI(object):
    def __init__(self, port):
        self.logger = logging.getLogger(__name__)
        self.ctx = zmq.Context()
        self.socket = self.ctx.socket(zmq.PAIR)
        self.socket.connect('tcp://127.0.0.1:{0}'.format(port))
        self.msg_recv = None

    def get_str_input(self, question):
        while True:
            try:
                choice = input(question)
                if any((choice is None, not choice.strip())):
                    print('Error: Empty string entered!!!')
                    self.logger.error('Error: Empty string entered!!!')
                else:
                    return choice
            except Exception:
                raise

    def get_int_input(self, question):
        while True:
            try:
                choice = self.get_str_input(question)
                choice = int(choice)
                return choice
            except ValueError as err:
                print(err)
            except Exception:
                raise

    def close_game(self):
        self.logger.debug(f'Closing due to application panic')
        self.socket.send_pyobj(dict(
            cmd='ENDED'
        ))
        self.socket.close()
        self.ctx.term()
        self.logger.debug(f'Closing game')
        sys.exit(0)

    def start(self):
        self.logger.info('CmdUI')
        msg_recv = None
        try:
            while True:
                try:
                    msg_snd = self.get_str_input('Enter a message: ')
                    self.socket.send_pyobj(dict(cmd=msg_snd))
                    # Try to receive message
                    msg_recv = self.socket.recv_pyobj(flags=zmq.NOBLOCK)
                except zmq.ZMQError as exc:
                    if exc.errno == zmq.EAGAIN:
                        pass
                    else:
                        raise

                self.logger.info(f'Received {str(msg_recv)}')
                if msg_recv and msg_recv.get('cmd') == 'ENDED':
                    print('Player ended game session')
                    self.logger.debug('Player ended game session')
                    self.close_game()
        except Exception:
            self.close_game()

def main():
    ui, netclient = None, None
    try:
        port = get_random_port()
        netclient = NetClient(port, 'ws://localhost:8888/wshandler')
        ui = CmdUI(port)
        netclient.start()
        ui.start()
    except SystemExit:
        netclient.join()
        ui.logger.debug('Client appl ended')

if __name__ == '__main__':
    main()

Спасибо.

...