Торнадо Асинхронный пишет - PullRequest
0 голосов
/ 14 сентября 2018

Приведенный ниже код является упрощенной версией TCP-сервера на базе Tornado, который в настоящее время используется для размещения системы Videotex. Этот код был получен из документации Tornado, и сервер некоторое время работал в рабочей среде без проблем, однако есть функция, которую мне нужно добавить.

Система в настоящее время блокируется, пока символ не будет получен от клиента перед возвратом данных через stream.write. Поскольку система обычно работает на скорости 1200 бод на стороне клиента (через модем telnet), это означает, что пользователь должен ждать, пока все записи потока не будут завершены, прежде чем будет обработан следующий символ «введенный пользователем».

Что я хотел бы сделать, так это найти способ, который позволил бы мне отказаться от записи данных в stream.write, если от клиента получен другой символ.

Я новичок в Tornado и довольно плохо знаком с Python, однако в прошлом я кодировал асинхронные функции и многопоточные решения с использованием C #.

Из документации операция stream.write является асинхронной, поэтому я предполагаю, что вызов может вернуться до того, как данные будут полностью записаны. Мне остается думать, что мне нужен метод для отказа / опустошения / продвижения буфера записи для остановки операция записи, если в файле stream.read обнаружен новый символ.

Один из вариантов, который, как мне кажется, дает мне то, что мне нужно, - это как-то выполнить stream.writes в другом потоке, однако, этот подход кажется неуместным при использовании IOLoop Tornado и т. Д.

Есть ли способ дать мне услугу, которую я ищу? Я полностью контролирую код и с удовольствием реструктурирую приложение, если это необходимо.

import logging
import struct
import os
import traceback

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from tornado.tcpserver import TCPServer

# Configure logging.
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.INFO)

# Cache this struct definition; important optimization.
int_struct = struct.Struct("<i")
_UNPACK_INT = int_struct.unpack
_PACK_INT = int_struct.pack


class TornadoServer(TCPServer):

    def start(self, port):

        self.port = port
        server.listen(port)

    @gen.coroutine
    def handle_stream(self, stream, address):

        logging.info("[viewdata] Connection from client address {0}.".format(address))
        try:

            while True:

                char = yield stream.read_bytes(1) # this call blocks

                asc = ord(char)
                logger.info('[viewdata] Byte Received {0} ({1})'.format(hex(asc), asc))

                # Do some processing using the received char and return the appropriate page of data
                stream.write('This is the data you asked for...'.encode())

        except StreamClosedError as ex:
            logger.info("[viewdata] {0} Disconnected: {1} Message: {2}".format(address, type(ex), str(ex)))
        except Exception as ex:
            logger.error("[viewdata] {0} Exception: {1} Message: {2}".format(address, type(ex), str(ex)))
            logger.error(traceback.format_exc())


if __name__ == '__main__':

    server = TornadoServer()
    server.start(25232)

    loop = IOLoop.current()
    loop.start()

1 Ответ

0 голосов
/ 23 сентября 2018

Основная идея заключается в том, что вы переносите длинную обработку в отдельную задачу. Когда вы получаете новые данные, вы выбираете, что делать (в случае ниже я отменяю текущую операцию)

import logging
import os
import traceback

import threading
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from tornado.tcpserver import TCPServer

# Configure logging.
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.INFO)

class TornadoServer(TCPServer):

    def start(self, port):

        self.port = port
        server.listen(port)

    async def process_stream(self, stream, char, cancel_event):
        asc = ord(char)
        logger.info('[viewdata] Byte Received {0} ({1})'.format(hex(asc), asc))
        N = 5
        for i in range(N):
            if cancel_event.is_set():
                logger.info('[viewdata] Abort streaming')
                break
            # Do some processing using the received char and return the appropriate page of data
            msg = 'This is the {0} data you asked for...'.format(i)
            logger.info(msg)
            await stream.write('This is the part {0} of {1} you asked for...'.format(i, N).encode())
            await gen.sleep(1.0)  # make this processing longer..


    async def handle_stream(self, stream, address):

        process_stream_future = None
        cancel_event = None

        logging.info("[viewdata] Connection from client address {0}.".format(address))
        while True:
            try:

                char = await stream.read_bytes(1)  # this call blocks

                # when received client input, cancel running job
                if process_stream_future:
                    process_stream_future.cancel()
                if cancel_event:
                    cancel_event.set()

                cancel_event = threading.Event()
                process_stream_future = gen.convert_yielded(
                    self.process_stream(stream, char, cancel_event))
                self.io_loop.add_future(process_stream_future, lambda f: f.result())

            except StreamClosedError as ex:
                logger.info("[viewdata] {0} Disconnected: {1} Message: {2}".format(address, type(ex), str(ex)))
            except Exception as ex:
                logger.error("[viewdata] {0} Exception: {1} Message: {2}".format(address, type(ex), str(ex)))
                logger.error(traceback.format_exc())


if __name__ == '__main__':

    server = TornadoServer()
    server.listen(25232)

    loop = IOLoop.current()
    loop.start()
...