Как обеспечить обработку всех команд (и ошибок) в указанном порядке - PullRequest
0 голосов
/ 08 ноября 2018

TLDR; Как мне сделать «один файл» asyncio.Queue() и передать ему мои команды adb, чтобы они выполнялись в порядке их получения (по одному), обрабатывать ошибки, которые могут возникнуть (отключить / восстановить) во время одной из задач и продолжить обработку остальной части очереди после обработки ошибки?


Я работаю над модулем, который использует существующий модуль python-adb , чтобы в конечном итоге управлять моим android-планшетом как мультимедийным устройством и включать его в мои настройки домашней автоматизации.

Проблема:
Мой модуль полностью построен вокруг async, а модуль python-adb - нет. Модуль python-adb также не управляет / регулирует запросы. И я очень быстро обнаружил, что, если несколько команд adb запрашиваются слишком быстро, соединение adb перегружается, что приводит к ошибке и требует повторного подключения при каждом отключении.

Моему другу удалось внедрить обходное / хакерское решение. Примечание: self._adb_lock & self._adb_error изначально установлены в функции AndroidDevice класса *1023*.

def adb_wrapper(func):
    """Wait if previous ADB commands haven't finished."""
    @functools.wraps(func)
    async def _adb_wrapper(self, *args, **kwargs):
        attempts = 0
        while self._adb_lock and attempts < 5:
            attempts += 1
            await asyncio.sleep(1)
        if (attempts == 4 and self._adb_lock) or self._adb_error:
            try:
                await self.connect()
                self._adb_error = False
            except self._exceptions:
                logging.error('Failed to re-establish the ADB connection; '
                              'will re-attempt in the next update.')
                self._adb = None
                self._adb_lock = False
                self._adb_error = True
                return

        self._adb_lock = True
        try:
            returns = await func(self, *args, **kwargs)
        except self._exceptions:
            returns = None
            logging.error('Failed to execute an ADB command; will attempt to '
                          're-establish the ADB connection in the next update')
            self._adb = None
            self._adb_error = True
        finally:
            self._adb_lock = False

        return returns

    return _adb_wrapper

С помощью этого обходного пути я поместил декоратор @adb_wrapper над всеми функциями, которые выполняют вызовы adb. Однако это ужасно неэффективно, и на устройствах более высокого уровня не предотвращается перегрузка соединения adb.

Введите asyncio
Позвольте мне начать с того, что у меня очень мало опыта работы с asyncio на данный момент; поэтому было легко выбрать, какие вопросы, которые уже были опубликованы, помогли бы мне. Итак, мои извинения, если ответ уже присутствует в другом месте. Кроме того, чтобы дать людям представление о том, как работает моя библиотека, кодовый блок будет немного длинным, но я включил только часть файла (несколько функций, чтобы показать, как я в конечном итоге взаимодействую), и я попытался только включить функции, которые подключаются, чтобы показать цепочку команд.

Моя идея решения:
Моя цель состоит в том, чтобы иметь возможность использовать asyncio, чтобы ставить в очередь все команды и отправлять их по одной за раз, и если в какой-то момент команда не выполнится (что приведет к отключению adb), я хочу восстановить соединение adb и продолжить с очередью команд.

Текущая структура кода:

class AndroidTV:
    """ Represents an Android TV device. """

    def __init__(self, host, adbkey=''):
        """ Initialize AndroidTV object.
        :param host: Host in format <address>:port.
        :param adbkey: The path to the "adbkey" file
        """
        self.host = host
        self.adbkey = adbkey
        self._adb = None
        self.state = STATE_UNKNOWN
        self.muted = False
        self.device = 'hdmi'
        self.volume = 0.
        self.app_id = None

        self.package_launcher = None
        self.package_settings = None

        self._adb_error = False
        self._adb_lock = False
        self._exceptions = (TypeError, ValueError, AttributeError,
                            InvalidCommandError, InvalidResponseError,
                            InvalidChecksumError, BrokenPipeError)

    @adb_wrapper
    async def connect(self):
        """ Connect to an Android TV device.
        Will attempt to establish ADB connection to the given host.
        Failure sets state to UNKNOWN and disables sending actions.
        """
        try:
            if self.adbkey:
                signer = Signer(self.adbkey)

                # Connect to the device
                self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
            else:
                self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)

            if not self.package_settings:
                self._adb.Shell("am start -a android.settings.SETTINGS")
                await asyncio.sleep(1)
                logging.info("Getting Settings App Package")
                self.package_settings = await self.current_app
            if not self.package_launcher:
                await self.home()
                await asyncio.sleep(1)
                logging.info("Getting Launcher App Package")
                self.package_launcher = await self.current_app

        except socket_error as serr:
            logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)

    @adb_wrapper
    async def update(self):
        """ Update the device status. """
        # Check if device is disconnected.
        if not self._adb:
            self.state = STATE_UNKNOWN
            self.app_id = None
        # Check if device is off.
        elif not await self._screen_on:
            self.state = STATE_OFF
            self.app_id = None
        else:
            self.app_id = await self.current_app

            if await self._wake_lock:
                self.state = STATE_PLAYING
            elif self.app_id not in (self.package_launcher, self.package_settings):
                # Check if state was playing on last update
                if self.state == STATE_PLAYING:
                    self.state = STATE_PAUSED
                elif self.state != STATE_PAUSED:
                    self.state = STATE_IDLE
            else:
                # We're on either the launcher or in settings
                self.state = STATE_ON

            # Get information from the audio status.
            audio_output = await self._dump('audio')
            stream_block = re.findall(BLOCK_REGEX, audio_output,
                                      re.DOTALL | re.MULTILINE)[0]
            self.muted = re.findall(MUTED_REGEX, stream_block,
                                    re.DOTALL | re.MULTILINE)[0] == 'true'

    @property
    async def current_app(self):
        filtered_dump = await self._dump("window windows", "mCurrentFocus")
        current_focus = filtered_dump.replace("\r", "")
        matches = WINDOW_REGEX.search(current_focus)
        if matches:
            (pkg, activity) = matches.group('package', 'activity')
            return pkg
        else:
            logging.warning("Couldn't get current app, reply was %s", current_focus)
            return None

    @property
    async def _screen_on(self):
        return await self._dump_has('power', 'Display Power', 'state=ON')

    @property
    async def _awake(self):
        return await self._dump_has('power', 'mWakefulness', 'Awake')

    @property
    async def _wake_lock(self):
        return not await self._dump_has('power', 'Locks', 'size=0')

    @adb_wrapper
    async def _input(self, cmd):
        if not self._adb:
            return
        self._adb.Shell('input {0}'.format(cmd))

    @adb_wrapper
    async def _dump(self, service, grep=None):
        if not self._adb:
            return
        if grep:
            return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
        return self._adb.Shell('dumpsys {0}'.format(service))

    async def _dump_has(self, service, grep, search):
        dump_result = await self._dump(service, grep=grep)
        return dump_result.strip().find(search) > -1

Как я уже говорил, вышеупомянутый метод частично работает, но в основном это пластырь.

Единственные команды, которые напрямую делают adb.Shell вызовы, -
1. async def connect(self)
2. async def update(self)
3. async def _input(self, cmd)
4. async def _dump(self, service, grep=None)
5. async def _key(self, key)

Функции connect & update приводят к нескольким adb.Shell вызовам самим, так что, возможно, именно в этом моя проблема.

Мой (3 части) Вопрос:
1. Как я могу поставить все команды в очередь по мере их поступления?
2. Выполнить их в порядке их получения?
3. Обрабатывать ошибки в любой момент, переподключиться, затем продолжить выполнение оставшейся части очереди команд?

Вот моя неудачная половина попытки сделать это.

import asyncio

async def produce_output(queue, commands):
    for command in commands:
        #execute the adb command
        if 'keypress' in command:
            #command contains 'input keypress ENTER'
            adb.Shell(command)
            #mark the task done because there's nothing to process
            queue.task_done()
        else:
            #command contains 'dumpsys audio'
            output = adb.Shell(command)
            #put result in queue
            await queue.put(output)

async def process_adb(queue):
    while True:
        output = await queue.get()
        #return output (somehow?)
        queue.task_done()


async def update():
    adb_queue = asyncio.Queue()
    asyncio.create_task(produce_output(adb_queue,
        [self._screen_on,
         self.current_app,
         self._wake_lock,
         self._dump('audio')]))
    #Not sure how to proceed

    if not self._adb:
        self.state = STATE_UNKNOWN
        self.app_id = None
    # Check if device is off.
    # Fetching result of first item in the queue - self._screen_on
    elif not await adb_queue.get():
        self.state = STATE_OFF
        self.app_id = None
    else:
        # Fetching result of second item in the queue - self.current_app
        self.app_id = await adb_queue.get()

        # Fetching result of third item in the queue - self._wake_lock
        if await adb_queue.get():
            self.state = STATE_PLAYING
        elif self.app_id not in (self.package_launcher, self.package_settings):
            # Check if state was playing on last update
            if self.state == STATE_PLAYING:
                self.state = STATE_PAUSED
            elif self.state != STATE_PAUSED:
                self.state = STATE_IDLE
        else:
            # We're on either the launcher or in settings
            self.state = STATE_ON

        # Get information from the audio status.
        # Fetching result of fourth item in the queue - self._dump('audio')
        audio_output = await adb_queue.get()
        stream_block = re.findall(BLOCK_REGEX, audio_output,
                                  re.DOTALL | re.MULTILINE)[0]
        self.muted = re.findall(MUTED_REGEX, stream_block,
                                re.DOTALL | re.MULTILINE)[0] == 'true'

1 Ответ

0 голосов
/ 08 ноября 2018

Необходимо убедиться, что только одна задача использует соединение adb для выполнения команды в любой момент времени. Это означает, что вам нужно либо использовать примитивы синхронизации для координации доступа, либо использовать очередь для подачи команд одной рабочей задачи для выполнения.

Далее, поскольку соединение adb полностью синхронно и, как и во всех операциях ввода-вывода, относительно медленно , я бы использовал исполнитель пула потоков для запуска операций на соединении adb из цикла asyncio, так что asyncio может свободно запускать некоторые другие задачи, которые в настоящее время не заблокированы в операциях ввода-вывода. В противном случае нет смысла помещать .Shell() команды в сопрограмму async def, вы фактически не сотрудничаете и не освобождаете место для выполнения других задач.

И последнее, но не менее важное: если даже при сериализованном доступе к объекту соединения вы обнаружите, что он не может принимать слишком много команд за промежуток времени, вы захотите использовать какую-то технику ограничения скорости. Я создал реализацию алгоритма asyncio leaky bucket до , которая может позаботиться об этом, если потребуется.

И очередь, или блокировка гарантируют, что команды будут выполняться в порядке очередности поступления, но для очереди потребуется некоторый механизм отложенного ответа для возврата результатов команды. Очередь позволила бы вам поставить в очередь связанные команды (вы можете добавить несколько записей, используя queue.put_nowait() без сдачи или вы можете разрешить группировать команды), без необходимости сначала ждать блокировки.

Поскольку вы хотите повторить попытки подключения, я бы инкапсулировал объект подключения в асинхронный диспетчер контекста , который затем может также обрабатывать команды блокировки и выполнения с исполнителем:

import asyncio
import collections
from concurrent.futures import ThreadPoolExecutor
from functools import partial

try:  # Python 3.7
    base = contextlib.AbstractAsyncContextManager
except AttributeError:
    base = object  # type: ignore

_retry_exceptions = (...,)  # define exceptions on which to retry commands?

class asyncnullcontext(base):
    def __init__(self, enter_result=None):
        self.enter_result = enter_result
    async def __aenter__(self):
        return self.enter_result
    async def __aexit__(self, *excinfo):
        pass

class AsyncADBConnection(base):
    def __init__(
        self,
        host,
        adbkey=None,
        rate_limit=None,
        max_retry=None,
        loop=None
    ):
        self._lock = asyncio.Lock(loop=loop)
        self._max_retry = max_retry
        self._loop = None
        self._connection = None
        self._executor = ThreadPoolExecutor()

        self._connect_kwargs = {
            "serial": host,
            "rsa_keys": [Signer(adbkey)] if adbkey else []
        }

        if rate_limit is not None:
            # max commands per second
            self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
        else:
            self._limiter = asyncnullcontext()

    async def __aenter__(self):
        await self._lock.acquire()
        await self._ensure_connection()
        return self

    async def __aexit__(self):
        self._lock.release()

    async def _ensure_connection(self):
        if self._connection is not None:
            return
        loop = self._loop or asyncio.get_running_loop()
        connector = partial(
            adb_commands.AdbCommands().ConnectDevice,
            **self._connect_kwargs
        )
        fut = loop.run_in_executor(pool, connector)
        self._connection = await fut

    async def shell(self, command):
        loop = self._loop or asyncio.get_running_loop()
        max_attempts = self._max_retry or 1
        attempts = 0
        while True:
            with self._limiter:
                try:
                    fut = loop.run_in_executor(
                        self._executor,
                        self._connection.Shell,
                        command
                    )
                    return await fut
                except _retry_exceptions as e:
                    attempts += 1
                    if attempts >= max_attempts:
                        raise
                    # re-connect on retry
                    self._connection = None
                    await self._ensure_connection()

Если вы затем используете очередь, используйте Future() экземпляров для передачи результатов.

Помещение задания в очередь становится:

fut = asyncio.Future()
await queue.put((command, fut))
result = await fut

Вы можете заключить это в служебную функцию или объект. Строка await fut возвращается только после того, как будущее получит результат. Для команд, в которых вы не заботитесь о результате, вам нужно набрать await, только если вы хотите убедиться, что команда выполнена.

Потребитель в рабочей задаче, которая управляет соединением, будет использовать:

while True:
    command, fut = await self.queue.get():
    async with self.connection as conn:
        response = await conn.shell(command)
        fut.set_result(response)
    self.queue.task_done()  # optional, only needed when joining the queue

, где self.connection - экземпляр AsyncADBConnection.

...