TLDR; Как мне сделать «один файл» asyncio.Queue()
и передать ему мои команды adb, чтобы они выполнялись в порядке их получения (по одному), обрабатывать ошибки, которые могут возникнуть (отключить / восстановить) во время одной из задач и продолжить обработку остальной части очереди после обработки ошибки?
Я работаю над модулем, который использует существующий модуль python-adb , чтобы в конечном итоге управлять моим android-планшетом как мультимедийным устройством и включать его в мои настройки домашней автоматизации.
Проблема:
Мой модуль полностью построен вокруг async
, а модуль python-adb
- нет. Модуль python-adb
также не управляет / регулирует запросы. И я очень быстро обнаружил, что, если несколько команд adb запрашиваются слишком быстро, соединение adb перегружается, что приводит к ошибке и требует повторного подключения при каждом отключении.
Моему другу удалось внедрить обходное / хакерское решение. Примечание: self._adb_lock
& self._adb_error
изначально установлены в функции AndroidDevice
класса *1023*.
def adb_wrapper(func):
"""Wait if previous ADB commands haven't finished."""
@functools.wraps(func)
async def _adb_wrapper(self, *args, **kwargs):
attempts = 0
while self._adb_lock and attempts < 5:
attempts += 1
await asyncio.sleep(1)
if (attempts == 4 and self._adb_lock) or self._adb_error:
try:
await self.connect()
self._adb_error = False
except self._exceptions:
logging.error('Failed to re-establish the ADB connection; '
'will re-attempt in the next update.')
self._adb = None
self._adb_lock = False
self._adb_error = True
return
self._adb_lock = True
try:
returns = await func(self, *args, **kwargs)
except self._exceptions:
returns = None
logging.error('Failed to execute an ADB command; will attempt to '
're-establish the ADB connection in the next update')
self._adb = None
self._adb_error = True
finally:
self._adb_lock = False
return returns
return _adb_wrapper
С помощью этого обходного пути я поместил декоратор @adb_wrapper
над всеми функциями, которые выполняют вызовы adb. Однако это ужасно неэффективно, и на устройствах более высокого уровня не предотвращается перегрузка соединения adb.
Введите asyncio
Позвольте мне начать с того, что у меня очень мало опыта работы с asyncio
на данный момент; поэтому было легко выбрать, какие вопросы, которые уже были опубликованы, помогли бы мне. Итак, мои извинения, если ответ уже присутствует в другом месте. Кроме того, чтобы дать людям представление о том, как работает моя библиотека, кодовый блок будет немного длинным, но я включил только часть файла (несколько функций, чтобы показать, как я в конечном итоге взаимодействую), и я попытался только включить функции, которые подключаются, чтобы показать цепочку команд.
Моя идея решения:
Моя цель состоит в том, чтобы иметь возможность использовать asyncio
, чтобы ставить в очередь все команды и отправлять их по одной за раз, и если в какой-то момент команда не выполнится (что приведет к отключению adb), я хочу восстановить соединение adb и продолжить с очередью команд.
Текущая структура кода:
class AndroidTV:
""" Represents an Android TV device. """
def __init__(self, host, adbkey=''):
""" Initialize AndroidTV object.
:param host: Host in format <address>:port.
:param adbkey: The path to the "adbkey" file
"""
self.host = host
self.adbkey = adbkey
self._adb = None
self.state = STATE_UNKNOWN
self.muted = False
self.device = 'hdmi'
self.volume = 0.
self.app_id = None
self.package_launcher = None
self.package_settings = None
self._adb_error = False
self._adb_lock = False
self._exceptions = (TypeError, ValueError, AttributeError,
InvalidCommandError, InvalidResponseError,
InvalidChecksumError, BrokenPipeError)
@adb_wrapper
async def connect(self):
""" Connect to an Android TV device.
Will attempt to establish ADB connection to the given host.
Failure sets state to UNKNOWN and disables sending actions.
"""
try:
if self.adbkey:
signer = Signer(self.adbkey)
# Connect to the device
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
else:
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)
if not self.package_settings:
self._adb.Shell("am start -a android.settings.SETTINGS")
await asyncio.sleep(1)
logging.info("Getting Settings App Package")
self.package_settings = await self.current_app
if not self.package_launcher:
await self.home()
await asyncio.sleep(1)
logging.info("Getting Launcher App Package")
self.package_launcher = await self.current_app
except socket_error as serr:
logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)
@adb_wrapper
async def update(self):
""" Update the device status. """
# Check if device is disconnected.
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
elif not await self._screen_on:
self.state = STATE_OFF
self.app_id = None
else:
self.app_id = await self.current_app
if await self._wake_lock:
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
audio_output = await self._dump('audio')
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
@property
async def current_app(self):
filtered_dump = await self._dump("window windows", "mCurrentFocus")
current_focus = filtered_dump.replace("\r", "")
matches = WINDOW_REGEX.search(current_focus)
if matches:
(pkg, activity) = matches.group('package', 'activity')
return pkg
else:
logging.warning("Couldn't get current app, reply was %s", current_focus)
return None
@property
async def _screen_on(self):
return await self._dump_has('power', 'Display Power', 'state=ON')
@property
async def _awake(self):
return await self._dump_has('power', 'mWakefulness', 'Awake')
@property
async def _wake_lock(self):
return not await self._dump_has('power', 'Locks', 'size=0')
@adb_wrapper
async def _input(self, cmd):
if not self._adb:
return
self._adb.Shell('input {0}'.format(cmd))
@adb_wrapper
async def _dump(self, service, grep=None):
if not self._adb:
return
if grep:
return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
return self._adb.Shell('dumpsys {0}'.format(service))
async def _dump_has(self, service, grep, search):
dump_result = await self._dump(service, grep=grep)
return dump_result.strip().find(search) > -1
Как я уже говорил, вышеупомянутый метод частично работает, но в основном это пластырь.
Единственные команды, которые напрямую делают adb.Shell
вызовы, -
1. async def connect(self)
2. async def update(self)
3. async def _input(self, cmd)
4. async def _dump(self, service, grep=None)
5. async def _key(self, key)
Функции connect
& update
приводят к нескольким adb.Shell
вызовам самим, так что, возможно, именно в этом моя проблема.
Мой (3 части) Вопрос:
1. Как я могу поставить все команды в очередь по мере их поступления?
2. Выполнить их в порядке их получения?
3. Обрабатывать ошибки в любой момент, переподключиться, затем продолжить выполнение оставшейся части очереди команд?
Вот моя неудачная половина попытки сделать это.
import asyncio
async def produce_output(queue, commands):
for command in commands:
#execute the adb command
if 'keypress' in command:
#command contains 'input keypress ENTER'
adb.Shell(command)
#mark the task done because there's nothing to process
queue.task_done()
else:
#command contains 'dumpsys audio'
output = adb.Shell(command)
#put result in queue
await queue.put(output)
async def process_adb(queue):
while True:
output = await queue.get()
#return output (somehow?)
queue.task_done()
async def update():
adb_queue = asyncio.Queue()
asyncio.create_task(produce_output(adb_queue,
[self._screen_on,
self.current_app,
self._wake_lock,
self._dump('audio')]))
#Not sure how to proceed
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
# Fetching result of first item in the queue - self._screen_on
elif not await adb_queue.get():
self.state = STATE_OFF
self.app_id = None
else:
# Fetching result of second item in the queue - self.current_app
self.app_id = await adb_queue.get()
# Fetching result of third item in the queue - self._wake_lock
if await adb_queue.get():
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
# Fetching result of fourth item in the queue - self._dump('audio')
audio_output = await adb_queue.get()
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'