В настоящее время я использую asyncio.wait_for
для опроса websocket.recv
, чтобы я также мог позвонить websocket.send
, когда мне нужно:
async def client_reader():
websocket = await websockets.connect(f"ws://localhost:{PORT}")
result = ''
while True:
try:
# Is there a better way to recv from a websocket?
result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
if result.endswith('\n'):
result = result.rstrip()
print(f"result = {result}")
result = ''
# I need to do other asyncio here
await some_other_work()
except asyncio.TimeoutError:
pass
Вся документация для asyncio
и websockets
что я мог найти только примеры игрушек. Есть ли лучший способ сделать это?
Это отдельная программа, которая имитирует то, что я делаю:
import asyncio
import websockets
import random
import multiprocessing
import time
PORT = 49152
text = """\
This is a bunch of text that will be used to
simulate a server sending multiple lines of
text to a client with a random amount of delay
between each line.
"""
def get_next_line():
text_list = text.split('\n')
while True:
for line in text_list:
yield line + '\n'
line_generator = get_next_line()
async def delay_server(websocket, path):
while True:
await asyncio.sleep(random.random() * 5.0)
line = next(line_generator)
await websocket.send(line)
def server_func():
try:
start_server = websockets.serve(delay_server, "localhost", PORT)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
pass
async def some_other_work():
print("I occasionally need to call websocket.send() here")
async def client_reader():
websocket = await websockets.connect(f"ws://localhost:{PORT}")
result = ''
while True:
try:
result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
if result.endswith('\n'):
result = result.rstrip()
print(f"result = {result}")
result = ''
await some_other_work()
except asyncio.TimeoutError:
pass
def client_func():
try:
asyncio.run(client_reader())
except KeyboardInterrupt:
pass
server_proc = multiprocessing.Process(target=server_func)
server_proc.daemon = True
server_proc.start()
client_proc = multiprocessing.Process(target=client_func)
client_proc.daemon = True
client_proc.start()
try:
while True:
time.sleep(1.0)
except KeyboardInterrupt:
pass
server_proc.join()
client_proc.join()