Фьючерсы: установить результат из другого потока - PullRequest
0 голосов
/ 31 октября 2018

У меня есть http-сервер, который получает несколько запросов в потоке. Этот http-сервер анализирует полученные данные и отправляет часть данных другому процессу.

Ответ от этого процесса может занять некоторое время и будет получен при обратном вызове в другом потоке.

Проблема здесь в том, что мне нужна часть ответа, полученного в функции обратного вызова, чтобы ответить на первоначальный запрос, который я получил, поэтому моя идея заключалась в том, чтобы использовать asyncio.Task для этого

Мой код выглядит следующим образом:

my_futures = dict()

class HTTPHandler(http.server.BaseHTTPRequestHandle):
    do_POST(self):
        #parse data
        asyncio.create_task(self.send_response())

    async def send_response(self):
        global my_futures
        my_futures[uuid] = asyncio.get_event_loop().create_future()
        send_data_to_processB()
        await my_future
        self.send_response()

# Callback where I receive the result from process B
def on_message(message):
    global my_futures
    my_futures[message['uuid']].set_result(message['result'])

if __name__ == '__main__':
    server = http.server.HTTPServer(LISTEN_ADDR, MyHTTPHandler)
    threading.Thread(target=server.serve_forever).start()
    processB.register_callback(on_message)

Проблема этого подхода заключается в том, что задача не выполняется и обратный вызов не получает результатов вообще.

Я также попытался изменить метод do_POST, чтобы использовать asyncio.run(self.send_response()) вместо asyncio.create_task, используя этот подход, мой обратный вызов получает результаты и устанавливает результат, но сопрограмма просто висит на await my_future

Как мне решить эту задачу?

1 Ответ

0 голосов
/ 01 ноября 2018

Чтобы взаимодействовать с asyncio извне потока цикла событий, используйте call_soon_threadsafe:

def on_message(message):
    # my_loop must have been initialized from the main thread
    my_loop.call_soon_threadsafe(
        my_futures[message['uuid']].set_result, message['result'])

Однако это не единственная проблема с вашим кодом. Вы никогда не запускаете цикл обработки событий, поэтому ни один из них не имеет возможности выполнить. Вам нужно иметь asyncio.run или эквивалентный код где-то в вашем коде, обычно на верхнем уровне.

Поскольку HTTPServer не написано для asyncio, вы не можете вызывать функции asyncio из do_POST. Вместо этого посмотрите на использование aiohttp.

...