Фоновый поток asyncio: функция запуска в блокировке основного потока - PullRequest
0 голосов
/ 01 февраля 2019

У меня есть два потока (основной поток и некоторый фоновый поток), и оба имеют свой собственный цикл событий asyncio.

Теперь рассмотрим, что я нахожусь в фоновом потоке, и я хочу что-то выполнить (func_for_main_thread) в основной теме.Выполнение этой асинхронной операции будет следующим:

main_thread_loop.call_soon_threadsafe(func_for_main_thread)

Однако как я могу выполнить эту синхронизацию / блокировку, то есть дождаться выполнения func_for_main_thread?

Связано это это вопрос, который задает тот же вопрос для Qt и описывает ту же функциональность Apple GCD , которая в основном:

dispatch_async(dispatch_get_main_queue(), ^{ /* do sth */ });

против:

dispatch_sync(dispatch_get_main_queue(), ^{ /* do sth */ });

1 Ответ

0 голосов
/ 01 февраля 2019

Если я правильно понял, что вы хотите, ничто не мешает вам передать Future в основной поток, чтобы сделать это выполненным, как только func_for_main_thread готово.В фоновом потоке вы можете ожидать этого будущего.

Другими словами:

import asyncio
from functools import partial


async def called_threadsafe(loop, func):
    current_loop = asyncio.get_event_loop()
    fut = asyncio.Future()

    def call_and_set():
        try:
            res = func()
        except Exception as exc:
            f = partial(fut.set_exception, exc)
            current_loop.call_soon_threadsafe(f)
        else:
            f = partial(fut.set_result, res)
            current_loop.call_soon_threadsafe(f)
    loop.call_soon_threadsafe(call_and_set)  # submit to execute in other thread

    return await fut  # in current thread await other thread executed func and set future

Полный код, демонстрирующий, как это будет работать:

import asyncio
from functools import partial
import threading
import time


async def called_threadsafe(loop, func):
    current_loop = asyncio.get_event_loop()
    fut = asyncio.Future()

    def call_and_set():
        try:
            res = func()
        except Exception as exc:
            f = partial(fut.set_exception, exc)
            current_loop.call_soon_threadsafe(f)
        else:
            f = partial(fut.set_result, res)
            current_loop.call_soon_threadsafe(f)
    loop.call_soon_threadsafe(call_and_set)

    return await fut


# helpers:
_l = threading.Lock()

def info(*args):
    with _l:
        print(*args, threading.get_ident(), flush=True)


def start_bg_loop():
    bg_loop = asyncio.new_event_loop()

    def startup():
        asyncio.set_event_loop(bg_loop)
        bg_loop.run_forever()

    t = threading.Thread(target=startup)
    t.daemon = True
    t.start()

    return bg_loop


# main part:
def func_for_main_thread():
    info('executed in fg thread')
    time.sleep(0.05)
    return 'got result in bg thread'


async def bg_main(fg_loop):
    info('bg_main started')
    await asyncio.sleep(0.1)

    res = await called_threadsafe(fg_loop, func_for_main_thread)
    info(res)

    info('bg_main finished')


async def fg_main(bg_loop):
    info('fg_main started')
    await asyncio.sleep(1)
    info('fg_main finished')


fg_loop = asyncio.get_event_loop()
bg_loop = start_bg_loop()

asyncio.run_coroutine_threadsafe(bg_main(fg_loop), bg_loop)
fg_loop.run_until_complete(fg_main(bg_loop))

Вывод:

fg_main started 2252
bg_main started 5568
executed in fg thread 2252
got result in bg thread 5568
bg_main finished 5568
fg_main finished 2252
...