Работа с асинхронными методами для шаблона уведомления наблюдателя - PullRequest
0 голосов
/ 23 апреля 2020

Я пытаюсь реализовать свой собственный шаблон наблюдателя как одиночный в python, но у меня возникли проблемы. Идея состоит в том, что любой существующий метод может наблюдать уведомления, и если это уведомление сработает, будет вызван этот существующий метод. Это работает для синхронных методов, но у меня возникают проблемы с запуском асинхронных методов.

Код наблюдателя:

from threading import Lock
import asyncio

class Observer:
    _singleton = None

    def __init__(self):
        # Key is notification name, value is Notification
        self.notifications = {}
        self.notification_lock = Lock()

    @classmethod
    def singleton(cls):
        if not Observer._singleton:
            Observer._singleton = Observer()
        return Observer._singleton

    def observe(self, notification_name, method):
        """Execute 'method' when 'notification_name' is notified."""
        with self.notification_lock:
            if notification_name not in self.notifications:
                self.notifications[notification_name] = Notification(notification_name, method)
            else:
                self.notifications[notification_name].add_observer(method)

    def notify(self, notification_name, notification_info):
        """Notify all methods observing 'notification_name'."""
        with self.notification_lock:
            if notification_name not in self.notifications:
                return
            notification = self.notifications[notification_name]
            for method in notification.methods:
                if asyncio.iscoroutinefunction(method):
                    # This doesn't work.
                    asyncio.run(method(notification_info))
                else:
                    method(notification_info)

class Notification:
    def __init__(self, notification_name, method):
        self.name = notification_name
        self.methods = [method]

    def add_observer(self, method):
        self.methods.append(method)

Добавляемый метод:

class MyClass:
   def __init__(self):
      self.loop = asyncio.get_event_loop()

      # I add the observer here.
      Observer.singleton().observe("notify_async_method", self.async_method)

   async def async_method(self, item):
      await self.other_async_task()

Уведомление метода (из синхронного контекста):

Observer.singleton().notify("notify_async_method", item)

Вместо завершения метода asyn c я вижу RuntimeError: Timeout context manager should be used inside a task, когда async_method () ожидает other_async_task ().

Примечание: эти методы могут выполняться в отдельных потоках.

...