Есть ли лучший способ смешивать websocket recv и отправлять вызовы, чем этот? - PullRequest
1 голос
/ 08 января 2020

В настоящее время я использую asyncio.wait_for для опроса websocket.recv, чтобы я также мог позвонить websocket.send, когда мне нужно:

async def client_reader():
    websocket = await websockets.connect(f"ws://localhost:{PORT}")
    result = ''
    while True:
        try:
            # Is there a better way to recv from a websocket?
            result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
            if result.endswith('\n'):
                result = result.rstrip()
                print(f"result = {result}")
                result = ''
            # I need to do other asyncio here
            await some_other_work()
        except asyncio.TimeoutError:
            pass

Вся документация для asyncio и websockets что я мог найти только примеры игрушек. Есть ли лучший способ сделать это?

Это отдельная программа, которая имитирует то, что я делаю:

import asyncio
import websockets
import random
import multiprocessing
import time

PORT = 49152

text = """\
This is a bunch of text that will be used to
simulate a server sending multiple lines of
text to a client with a random amount of delay
between each line.
"""

def get_next_line():
    text_list = text.split('\n')
    while True:
        for line in text_list:
            yield line + '\n'

line_generator = get_next_line()

async def delay_server(websocket, path):
    while True:
        await asyncio.sleep(random.random() * 5.0)
        line = next(line_generator)
        await websocket.send(line)

def server_func():
    try:
        start_server = websockets.serve(delay_server, "localhost", PORT)
        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()
    except KeyboardInterrupt:
        pass

async def some_other_work():
    print("I occasionally need to call websocket.send() here")

async def client_reader():
    websocket = await websockets.connect(f"ws://localhost:{PORT}")
    result = ''
    while True:
        try:
            result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
            if result.endswith('\n'):
                result = result.rstrip()
                print(f"result = {result}")
                result = ''
            await some_other_work()
        except asyncio.TimeoutError:
            pass

def client_func():
    try:
        asyncio.run(client_reader())
    except KeyboardInterrupt:
        pass

server_proc = multiprocessing.Process(target=server_func)
server_proc.daemon = True
server_proc.start()
client_proc = multiprocessing.Process(target=client_func)
client_proc.daemon = True
client_proc.start()
try:
    while True:
        time.sleep(1.0)
except KeyboardInterrupt:
    pass
server_proc.join()
client_proc.join()

1 Ответ

0 голосов
/ 09 января 2020

Я изменил код, чтобы он выглядел следующим образом:

async def client_reader(websocket, result):
    try:
        result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
        if result.endswith('\n'):
            result = result.rstrip()
            print(f"result = {result}")
            result = ''
    except asyncio.TimeoutError:
        pass
    return result

async def if_flag_send_message():
    print("if a flag is set, I will call websocket.send here")

async def client_writer(websocket):
    await if_flag_send_message()
    await asyncio.sleep(1.0)

async def client_handler():
    websocket = await websockets.connect(f"ws://localhost:{PORT}")
    result = ''
    while True:
        reader_task = asyncio.create_task(client_reader(websocket, result))
        writer_task = asyncio.create_task(client_writer(websocket))
        await asyncio.gather(reader_task, writer_task)
        result = reader_task.result()

Я все еще не уверен, что это правильный способ сделать что-то, но он работает.

...