Как я могу уведомить подпрограмму асинхронизации из подпрограммы синхронизации? - PullRequest
6 голосов
/ 31 октября 2019

У меня есть слушатель асинхронных веб-сокетов. Этот слушатель передает сообщения из основного цикла синхронизации. Я хотел бы сообщить слушателю async websockets, что есть новое сообщение для отправки.

В настоящее время я реализовал это с помощью цикла опроса (плохо). Я пытался использовать cond.notify_all (), но его нельзя использовать вне асинхронного кода?

Фрагмент кода:

ws_data = {}
ws_data_lock = threading.Lock()

async def ws_serve(websocket, path):
    global ws_data
    global ws_data_lock

    listen_pair = await websocket.recv()

    p_fen = None

    while True:
        send = None

        with ws_data_lock:
            if p_fen == None or ws_data[listen_pair] != p_fen:
                send = p_fen = ws_data[listen_pair]

        if send:
            await websocket.send(send)

        await asyncio.sleep(0.25)

...

def run_websockets_server():
    start_server = websockets.serve(ws_serve, ws_interface, ws_port)

    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()

def client_listener():
    while True:
        with ws_data_lock:
            ws_data[pair_id] = (p1_user, p2_user, time.time())
            # here I would like to let all websocket listeners know that
            # there's new data

t = threading.Thread(target=client_listener)
t.start()

run_websockets_server()

Ответы [ 2 ]

4 голосов
/ 09 ноября 2019

Моей первой реакцией будет: перейти к одной модели параллелизма . Либо используйте потоки повсюду, либо сопрограммы повсюду (с ограниченным использованием пула потоков для вещей, которые еще нельзя сделать с помощью asyncio).

У вашего проекта нет веских оснований пытаться смешать две модели,Я подозреваю, что вы начали использовать asyncio только потому, что Python websockets библиотека после того, как уже выбрали потоки. Оставшаяся часть вашего проекта также может быть построена с использованием сопрограмм (например, используя aiomysql для обработки соединений с базой данных и т. Д.).

Однако вы все равно можете объединить две модели, но вам нужноизучите документацию asyncio по , как его использовать в сочетании с потоками . В частности, чтобы отправить информацию из потока в ваши сопрограммы, вам необходимо использовать эти две функции:

  • asyncio.run_coroutine_threadsafe(coro, loop) позволяет добавить сопрограмму в рабочий цикл,и контролируйте эту сопрограмму с помощью Future объекта , если вам нужно что-то вернуть или если вы хотите отменить процедуру.
  • loop.call_soon_threadsafe(callback, *args) позволяет вам позвонить синхронный функционирует в том же потоке, что и цикл. Это полезно для обратных вызовов, которые вызываются из другого потока (например, вы могли бы ожидать сопрограмму для asyncio.Future() объекта и иметь функцию обратного вызова, устанавливающую результат для этого будущего объекта, поэтому передавая результат всопрограмма).

В вашем случае, если вы хотите отправить данные на все текущих подключений через веб-сокет, я бы использовал:

  • aотображение очередей с каждым активным ключом ws_serve задач. ws_serve задачи добавляют свою очередь в это сопоставление и убирают за собой. Затем задачи выбирают элементы для отправки из своей собственной очереди.
  • сопрограмма, которая добавляет информацию ко всем очередям при выполнении.
  • Другие потоки могут использовать asyncio.run_coroutine_threadsafe() для выполнения сопрограммы, которая добавляетв очереди.

Здесь нет необходимости использовать блокировку;сопрограммы имеют гораздо меньше проблем с параллелизмом, сопрограммы, изменяющие словарь, не проблема, если во время манипуляции нет await s (включая итерации по всем очередям).

Если вы инкапсулируете словарь очередей вменеджер контекста, вы можете легко убедиться, что очереди очищены должным образом:

# asyncio section, no thread access
import asyncio
from contextlib import AbstractContextManager


class WSSendQueues(AbstractContextManager):
    def __init__(self):
        self._queues = {}

    async def send_to_all(self, item):
        for queue in self._queues. values():
            queue.put_nowait(item)

    def __enter__(self):
        task = asyncio.current_task()
        self._queues[task] = queue = asyncio.Queue()
        return queue

    def __exit__(self, exc_type, exc_value, traceback):
        task = asyncio.current_task()
        self._queues.pop(task, None)

# global instance of the queues manager
# this has a coroutine `send_to_all()`
ws_queues = WSSendQueues()

def ws_serve(websocket, path):
    with ws_queues as queue:
        listen_pair = await websocket.recv()

        while True:
            to_send = await queue.get()  # blocks until something is available
            try:
                await websocket.send(to_send)
            finally:
                # let the queue know we handled the item
                queue.task_done()

def run_websockets_server(loop):
    start_server = websockets.serve(ws_serve, ws_interface, ws_port)

    loop.run_until_complete(start_server)
    loop.run_forever()

# reference to the asyncio loop *used for the main thread*
main_thread_loop = asyncio.get_event_loop()

# threads section, need access to the main_thread_loop to schedule
# coroutines

def client_listener():
    while True:
        # create the coroutine. THIS DOESN'T RUN IT YET.
        coro = ws_queues.send_to_all((p1_user, p2_user, time.time()))

        # and schedule it to run on the loop. From here on the
        # websockets will automatically receive the data on their respective queues.
        asyncio.run_coroutine_threadsafe(coro, main_thread_loop)


# starting the threads and event loop
t = threading.Thread(target=client_listener)
t.start()

run_websockets_server(main_thread_loop)

Ваш код еще не обрабатывает завершение работы, но я подготовил вышеописанное, чтобы можно было корректно завершать работу асинхронных веб-сокетов.

Вы бы начали с того, что больше не добавляете в очереди, а закрываете потоки, которые добавляют данные в очереди. Тогда вы захотите дождаться всех сопрограмм Queue.join() , чтобы вы знали, что все сокеты завершили отправку данных. Я бы добавил к этому тайм-аут, бессмысленно ждать здесь вечно. Вы можете сделать это сопрограммой в менеджере контекста:

async def join(self, timeout=None):
    """Wait for all the websocket queues to be empty

    If timeout is not none, limit the amount of time to wait.
    """
    tasks = [asyncio.create_task(q.join()) for q in self._queues.values()]
    done, pending = asyncio.wait(tasks, timeout=timeout)
    # cancel any remaining joins
    for task in pending:
        task.cancel()

Как только вы ожидаете в очередях (желательно с ограничением по времени), вы выключите сервер websockets и закроетепетля. Все это, конечно, делается из сопрограммы, которую вы запланировали в главном потоке.

1 голос
/ 08 ноября 2019

Я могу ответить только в виде непроверенных фрагментов кода. Я считаю, что вся программа слишком сложна, чтобы ее можно было изменить и протестировать самостоятельно.

Необходимо решить две проблемы.

1. передать данные из одного потока в процедуру asyncio в другом потоке

Существует только одна опция, и это call_soon_threadsafe () .

Нам нужно сохранить ссылку на цикл:

def run_websockets_server():
    global ws_loop

    ....
    ws_loop = asyncio.get_event_loop()
    ws_loop.run_until_complete(start_server)
    ws_loop.run_forever()

и затем мы можем указать циклу запланировать функцию для немедленного выполнения:

def client_listener():
    ...
            # here I would like to let all websocket listeners know that
            # there's new data
            ws_loop.call_soon_threadsafe(notify_all)

однако мы можем отправить новые данные, чтобы потребителю не приходилось их извлекать.

            # here I would like to let all websocket listeners know that
            # there's new data
            new_data = ...
            ws_loop.call_soon_threadsafe(notify_all, new_data)

Пожалуйста, решите сами, нужна ли блокировка.

2. чтобы передать данные всем ws_serve экземплярам

Давайте использовать asyncio.Queue . Очевидно, что нам нужен один за экземпляр. Сначала должно быть определено хранилище:

# new global variable
ws_queues = set()

каждый экземпляр будет поддерживать свою собственную очередь:

async def ws_serve(websocket, path):
    queue = asyncio.Queue()
    ws_queues.add(queue)
    try:
        ...
        while True:
            new_data = await queue.get()
            # send
    finally:
        ws_queues.remove(queue)

и последний отсутствующий фрагмент:

def notify_all(data):
    for q in ws_queues:
        q.put_nowait(data)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...