Моей первой реакцией будет: перейти к одной модели параллелизма . Либо используйте потоки повсюду, либо сопрограммы повсюду (с ограниченным использованием пула потоков для вещей, которые еще нельзя сделать с помощью asyncio).
У вашего проекта нет веских оснований пытаться смешать две модели,Я подозреваю, что вы начали использовать asyncio только потому, что Python websockets
библиотека после того, как уже выбрали потоки. Оставшаяся часть вашего проекта также может быть построена с использованием сопрограмм (например, используя aiomysql для обработки соединений с базой данных и т. Д.).
Однако вы все равно можете объединить две модели, но вам нужноизучите документацию asyncio по , как его использовать в сочетании с потоками . В частности, чтобы отправить информацию из потока в ваши сопрограммы, вам необходимо использовать эти две функции:
В вашем случае, если вы хотите отправить данные на все текущих подключений через веб-сокет, я бы использовал:
- aотображение очередей с каждым активным ключом
ws_serve
задач. ws_serve
задачи добавляют свою очередь в это сопоставление и убирают за собой. Затем задачи выбирают элементы для отправки из своей собственной очереди. - сопрограмма, которая добавляет информацию ко всем очередям при выполнении.
- Другие потоки могут использовать
asyncio.run_coroutine_threadsafe()
для выполнения сопрограммы, которая добавляетв очереди.
Здесь нет необходимости использовать блокировку;сопрограммы имеют гораздо меньше проблем с параллелизмом, сопрограммы, изменяющие словарь, не проблема, если во время манипуляции нет await
s (включая итерации по всем очередям).
Если вы инкапсулируете словарь очередей вменеджер контекста, вы можете легко убедиться, что очереди очищены должным образом:
# asyncio section, no thread access
import asyncio
from contextlib import AbstractContextManager
class WSSendQueues(AbstractContextManager):
def __init__(self):
self._queues = {}
async def send_to_all(self, item):
for queue in self._queues. values():
queue.put_nowait(item)
def __enter__(self):
task = asyncio.current_task()
self._queues[task] = queue = asyncio.Queue()
return queue
def __exit__(self, exc_type, exc_value, traceback):
task = asyncio.current_task()
self._queues.pop(task, None)
# global instance of the queues manager
# this has a coroutine `send_to_all()`
ws_queues = WSSendQueues()
def ws_serve(websocket, path):
with ws_queues as queue:
listen_pair = await websocket.recv()
while True:
to_send = await queue.get() # blocks until something is available
try:
await websocket.send(to_send)
finally:
# let the queue know we handled the item
queue.task_done()
def run_websockets_server(loop):
start_server = websockets.serve(ws_serve, ws_interface, ws_port)
loop.run_until_complete(start_server)
loop.run_forever()
# reference to the asyncio loop *used for the main thread*
main_thread_loop = asyncio.get_event_loop()
# threads section, need access to the main_thread_loop to schedule
# coroutines
def client_listener():
while True:
# create the coroutine. THIS DOESN'T RUN IT YET.
coro = ws_queues.send_to_all((p1_user, p2_user, time.time()))
# and schedule it to run on the loop. From here on the
# websockets will automatically receive the data on their respective queues.
asyncio.run_coroutine_threadsafe(coro, main_thread_loop)
# starting the threads and event loop
t = threading.Thread(target=client_listener)
t.start()
run_websockets_server(main_thread_loop)
Ваш код еще не обрабатывает завершение работы, но я подготовил вышеописанное, чтобы можно было корректно завершать работу асинхронных веб-сокетов.
Вы бы начали с того, что больше не добавляете в очереди, а закрываете потоки, которые добавляют данные в очереди. Тогда вы захотите дождаться всех сопрограмм Queue.join()
, чтобы вы знали, что все сокеты завершили отправку данных. Я бы добавил к этому тайм-аут, бессмысленно ждать здесь вечно. Вы можете сделать это сопрограммой в менеджере контекста:
async def join(self, timeout=None):
"""Wait for all the websocket queues to be empty
If timeout is not none, limit the amount of time to wait.
"""
tasks = [asyncio.create_task(q.join()) for q in self._queues.values()]
done, pending = asyncio.wait(tasks, timeout=timeout)
# cancel any remaining joins
for task in pending:
task.cancel()
Как только вы ожидаете в очередях (желательно с ограничением по времени), вы выключите сервер websockets и закроетепетля. Все это, конечно, делается из сопрограммы, которую вы запланировали в главном потоке.