Из запущенного потока вызовите сопрограмму на другом потоке - PullRequest
0 голосов
/ 28 июня 2018

Я заменяю часть существующей программы. Эта оригинальная программа использует потоки. Этот конкретный класс наследует от threading.Thread те функции, которые мне нужно заменить, но мне нужно сохранить интерфейс таким же.

Функциональность, которую я интегрирую, упакована в библиотеку, которая использует asyncio.

Исходные вызовы в заменяемый класс идут примерно так:

network = Network()
network.start()

network.fetch_something()  # crashes!

network.stop()

Я дошел до того, что мой замещающий класс также наследуется от threading.Thread, и я могу подключиться из метода run к моим бэкэндам через клиентскую библиотеку:

class Network(threading.Thread):
     def __init__(self):
         self._loop = asyncio.new_event_loop()
         self._client = Client()  # this is the library

     def run(self):
         self._loop.run_until_complete(self.__connect())  # works dandy, implementation not shown
         self._loop.run_forever()

     def fetch_something(self):
         return self._loop.run_until_complete(self._client.fetch_something())

Запуск этого кода вызывает исключение:

RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

Я вроде понимаю, что здесь происходит. В методе run все получилось, потому что вызывающим был тот же поток, который выполнял цикл обработки событий. В другом случае вызывающим был другой поток, поэтому проблема. Как вы могли заметить, я надеялся, что проблема будет решена с помощью того же цикла обработки событий. Увы, это не сработало.

Я действительно хочу сохранить интерфейс в точности таким, как он есть, в противном случае я занимаюсь рефакторингом до конца года. Я мог бы относительно легко передавать аргументы конструктору класса Network. Я попытался передать цикл обработки событий, созданный в главном потоке, но результат был таким же.

(Обратите внимание, что у этого автора есть противоположная проблема: Вызов сопрограммы в потоке )

1 Ответ

0 голосов
/ 29 июня 2018

При планировании сопрограммы из другого потока необходимо использовать asyncio.run_coroutine_threadsafe. Например:

    def fetch_something(self):
        future = asyncio.run_coroutine_threadsafe(
            self._client.fetch_something(), loop)
        return future.result()

run_coroutine_threadsafe планирует сопрограмму с циклом обработки событий потокобезопасным способом и возвращает concurrent.futures.Future. Вы можете использовать возвращенное будущее, чтобы просто дождаться результата, как показано выше, но вы также можете передать его другим функциям, опросить, получен ли результат, или реализовать таймауты.

При объединении потоков и asyncio не забудьте убедиться, что все взаимодействуют с циклом событий из других потоков (даже для вызова чего-то простого, например loop.stop, для реализации Network.stop) выполняется с использованием loop.call_soon_threadsafe и asyncio.run_coroutine_threadsafe.

...