Я пытался запустить клиент, который имеет соединение веб-сокета с сервером веб-сокета, который отвечает на команды, выданные клиентом. Клиент принимает команды из stdin и отправляет другому процессу netclient
, который отправляет эти команды дальше на сервер websocket. Сервер websocket является сервером Tornado. Клиент разговаривает с netclient
с zmq
. В конечном итоге этот клиент будет либо urwid клиентом (TUI), и / или PySide клиентом (GUI), поэтому я хотел бы повторно использовать тот же код netclient
.
Моим первоначальным решением было запустить netclient
в отдельном потоке для правильной инкапсуляции. Это работает, но когда я резко закрываю клиент с помощью Ctrl+C
, я получаю ошибку Exception ignored in: <module 'threading'
. Я хотел устранить ошибку, поэтому вместо этого переключился на multiprocessing.Process
, прочитав, что Ctrl+C
является особенно сложной задачей для потоков. После переключения на multiprocessing.Process
код перестал работать и выдал следующую загадочную ошибку:
Traceback (most recent call last):
File "/home/inspironxps/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/home/inspironxps/Downloads/netclient.py", line 104, in run
self.ioloop.current().start()
File "/home/inspironxps/.local/lib/python3.7/site-packages/tornado/platform/asyncio.py", line 148, in start
self.asyncio_loop.run_forever()
File "/home/inspironxps/.pyenv/versions/3.7.4/lib/python3.7/asyncio/base_events.py", line 534, in run_forever
self._run_once()
File "/home/inspironxps/.pyenv/versions/3.7.4/lib/python3.7/asyncio/base_events.py", line 1771, in _run_once
handle._run()
File "/home/inspironxps/.pyenv/versions/3.7.4/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "/home/inspironxps/.local/lib/python3.7/site-packages/tornado/platform/asyncio.py", line 138, in _handle_events
handler_func(fileobj, events)
File "/home/inspironxps/.local/lib/python3.7/site-packages/zmq/eventloop/zmqstream.py", line 452, in _handle_events
zmq_events = self.socket.EVENTS
File "/home/inspironxps/.local/lib/python3.7/site-packages/zmq/sugar/attrsettr.py", line 48, in __getattr__
return self._get_attr_opt(upper_key, opt)
File "/home/inspironxps/.local/lib/python3.7/site-packages/zmq/sugar/attrsettr.py", line 52, in _get_attr_opt
return self.get(opt)
File "zmq/backend/cython/socket.pyx", line 496, in zmq.backend.cython.socket.Socket.get
File "zmq/backend/cython/socket.pyx", line 262, in zmq.backend.cython.socket._getsockopt
File "zmq/backend/cython/checkrc.pxd", line 15, in zmq.backend.cython.checkrc._check_rc
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 963, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 906, in _find_spec
File "<frozen importlib._bootstrap_external>", line 1280, in find_spec
File "<frozen importlib._bootstrap_external>", line 1252, in _get_spec
File "<frozen importlib._bootstrap_external>", line 1391, in find_spec
Я планирую использовать аналогичную конфигурацию на сервере, чтобы ускорить несколько процессов / потоков, где каждый из этих процессовне делает то же самое. Итак, мой вопрос как мне безопасно завершить работу клиента или еще лучше, как лучше подойти к решению вышеуказанной проблемы ? Я использую Tornado 6.0.2, PyZMQ 18.1.0 и Kubuntu 18.04.2 LTS.
Код netclient
# netclient.py
import zmq
import logging
import asyncio
from ast import literal_eval
from multiprocessing import Process
# from threading import Thread
from zmq.eventloop.zmqstream import ZMQStream
from tornado import ioloop, websocket
# from serverenums import GameStatus
# from tornado.platform.asyncio import AnyThreadEventLoopPolicy
# asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
asyncio.set_event_loop(asyncio.new_event_loop())
class NetClient(Process):
def __init__(self, port, server_url, **kwargs):
super(NetClient, self).__init__()
self.logger = logging.getLogger(__name__)
self.wsconn = None
self.wsconn_close = False
self.has_wsconn_initialised = False
# self.has_sent_wsconn_initialised_msg = False
self.forced_exit = False
self.game_url = server_url
self.ctx = zmq.Context.instance()
self.socket = self.ctx.socket(zmq.PAIR)
# do not use localhost, because it would complain
# with error [zmq.error.ZMQError: No such device]
# self.socket.bind('inproc://uisocket')
self.socket.bind('tcp://127.0.0.1:{}'.format(port))
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.communicate_with_server)
self.logger.debug(f'Running on a port: {str(port)}')
self.ioloop = ioloop.IOLoop()
async def read_with_websocket(self):
if self.wsconn_close == True:
self.wsconn.close()
self.socket.send_pyobj(dict(
cmd='ENDED'
))
return
msg_recv = await self.wsconn.read_message()
self.logger.debug(f'Received from websocket={msg_recv}')
if msg_recv is None:
self.wsconn.close()
self.socket.send_pyobj(dict(
cmd='ENDED'
))
msg = literal_eval(msg_recv)
self.logger.debug('Received from websocket(Decoded)={}'.format(str(msg)))
return msg
async def send_with_websocket(self, msg):
if msg:
if isinstance(self.wsconn,
websocket.WebSocketClientConnection):
self.logger.debug('Sending game request')
# Cannot send a dict(somehow)
await self.wsconn.write_message(str(msg))
else:
self.forced_exit = True
raise RuntimeError("Websocket connection closed")
else:
self.wsconn_close = True
async def init_game_conn(self):
self.logger.debug('Creating initial game server connection')
try:
self.wsconn = await websocket.websocket_connect(self.game_url)
except Exception as err:
print("Connection error: {}".format(err))
else:
self.logger.debug('Connection established')
# if self.wsconn and not self.has_wsconn_initialised:
# self.has_wsconn_initialised = True
# self.socket.send_pyobj(dict(cmd='STARTED', msg='Connection established'))
self.has_wsconn_initialised = True
async def communicate_with_server(self):
while True:
msg_recv = self.socket.recv_pyobj()
self.logger.debug(f'Received from playerui={str(msg_recv)}')
if msg_recv.get('cmd') == 'ENDED' and not self.forced_exit:
self.cleanup()
elif not self.has_wsconn_initialised:
await self.init_game_conn()
else:
await self.send_with_websocket(msg_recv)
msg_recv = await self.read_with_websocket()
self.socket.send_pyobj(msg_recv)
def cleanup(self):
self.forced_exit = True
self.socket.close()
self.ctx.term()
self.logger.debug('Closing the netclient')
self.ioloop.stop()
def run(self):
self.logger.debug(f'Running on ioloop = ${str(self.ioloop)}')
self.ioloop.current().spawn_callback(self.communicate_with_server)
self.ioloop.current().start()
Код клиента
# client.py
import zmq
import sys
import logging
import uuid
from tornado import ioloop
# from serverenums import GameStatus
from netclient import NetClient
rootlogger = logging.getLogger()
rootlogger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
file_handler = logging.handlers.RotatingFileHandler(f'cmdui_{str(uuid.uuid4())}.log', maxBytes=(1048576*5), backupCount=7)
file_handler.setLevel(logging.DEBUG)
f_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(f_format)
rootlogger.addHandler(stream_handler)
rootlogger.addHandler(file_handler)
def get_random_port():
import socket
s = socket.socket()
s.bind(("", 0))
port_num = s.getsockname()[1]
s.close()
return port_num
class CmdUI(object):
def __init__(self, port):
self.logger = logging.getLogger(__name__)
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.PAIR)
self.socket.connect('tcp://127.0.0.1:{0}'.format(port))
self.msg_recv = None
def get_str_input(self, question):
while True:
try:
choice = input(question)
if any((choice is None, not choice.strip())):
print('Error: Empty string entered!!!')
self.logger.error('Error: Empty string entered!!!')
else:
return choice
except Exception:
raise
def get_int_input(self, question):
while True:
try:
choice = self.get_str_input(question)
choice = int(choice)
return choice
except ValueError as err:
print(err)
except Exception:
raise
def close_game(self):
self.logger.debug(f'Closing due to application panic')
self.socket.send_pyobj(dict(
cmd='ENDED'
))
self.socket.close()
self.ctx.term()
self.logger.debug(f'Closing game')
sys.exit(0)
def start(self):
self.logger.info('CmdUI')
msg_recv = None
try:
while True:
try:
msg_snd = self.get_str_input('Enter a message: ')
self.socket.send_pyobj(dict(cmd=msg_snd))
# Try to receive message
msg_recv = self.socket.recv_pyobj(flags=zmq.NOBLOCK)
except zmq.ZMQError as exc:
if exc.errno == zmq.EAGAIN:
pass
else:
raise
self.logger.info(f'Received {str(msg_recv)}')
if msg_recv and msg_recv.get('cmd') == 'ENDED':
print('Player ended game session')
self.logger.debug('Player ended game session')
self.close_game()
except Exception:
self.close_game()
def main():
ui, netclient = None, None
try:
port = get_random_port()
netclient = NetClient(port, 'ws://localhost:8888/wshandler')
ui = CmdUI(port)
netclient.start()
ui.start()
except SystemExit:
netclient.join()
ui.logger.debug('Client appl ended')
if __name__ == '__main__':
main()
Спасибо.