Преобразование небольших функций в сопрограммы - PullRequest
2 голосов
/ 26 апреля 2019

Я чувствую, что в моем понимании асинхронного ввода-вывода есть пробел: Есть ли польза от преобразования небольших функций в сопрограммы в рамках больших сопрограмм? Есть ли польза для этого в сигнализациицикл событий правильно?Зависит ли степень этого преимущества от того, является ли обернутая функция IO или связанной с ЦП?

Пример: у меня есть сопрограмма, download(), которая:

  1. Загрузки JSON-сериализованные байты из конечной точки HTTP через aiohttp.
  2. Сжимает эти байты с помощью bz2.compress() - что само по себе не ожидаемо
  3. Записываетсжатые байты в S3 через aioboto3

Так что части 1 и 3 используют предопределенные сопрограммы из этих библиотек;часть 2 не по умолчанию.

Приведенный пример:

import bz2
import io
import aiohttp
import aioboto3

async def download(endpoint, bucket_name, key):
    async with aiohttp.ClientSession() as session:
        async with session.request("GET", endpoint, raise_for_status=True) as resp:
            raw = await resp.read()  # payload (bytes)
            # Yikes - isn't it bad to throw a synchronous call into the middle
            # of a coroutine?
            comp = bz2.compress(raw)
            async with (
                aioboto3.session.Session()
                .resource('s3')
                .Bucket(bucket_name)
            ) as bucket:
                await bucket.upload_fileobj(io.BytesIO(comp), key)

Как указывалось в комментарии выше, я всегда понимал, что добавление синхронной функции, такой как bz2.compress(), всопрограмма может связываться с этим.(Даже если bz2.compress(), вероятно, в большей степени связан с вводом-выводом, чем с процессором.)

Итак, есть ли вообще какая-то польза для этого типа шаблонов?

async def compress(*args, **kwargs):
    return bz2.compress(*args, **kwargs)

(А теперь comp = await compress(raw) в download().)

Ва-ля, теперь это ожидаемая сопрограмма, потому что подошва return действительна в нативной сопрограмме.Есть ли основания для использования этого?

За этот ответ , я слышал оправдание для случайного добавления asyncio.sleep(0) подобным образом - просто для однократного возврата доцикл обработки событий, в котором вызывающая сопрограмма хочет перерыв.Это правильно?

Ответы [ 2 ]

2 голосов
/ 26 апреля 2019

Итак, есть ли вообще какая-то польза от этого типа эталона?

async def compress(*args, **kwargs):
    return bz2.compress(*args, **kwargs)

Нет никакой выгоды для него вообще.Вопреки ожиданиям, добавление await не гарантирует , что управление будет передано в цикл обработки событий - это произойдет только в том случае, если ожидаемая сопрограмма действительно приостанавливается.Поскольку compress ничего не ждет, он никогда не приостановится, поэтому это сопрограмма только по имени.

Обратите внимание, что добавление await asyncio.sleep(0) в сопрограммы не решает проблему;см. этот ответ для более подробного обсуждения.Если вам нужно запустить функцию блокировки, используйте run_in_executor:

async def compress(*args, **kwargs):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, lambda: bz2.compress(*args, **kwargs))
1 голос
/ 26 апреля 2019

Сопрограммы позволяют запускать что-то одновременно, не параллельно. Они допускают однопоточную кооперативную многозадачность. Это имеет смысл в двух случаях:

  • Вы должны производить результаты в режиме lockstep, как это делают два генератора.
  • Вы хотите, чтобы что-то полезное было сделано, пока другая сопрограмма ожидает ввода / вывода.

Такие вещи, как http-запросы или дисковый ввод-вывод, позволяют другим сопрограммам запускаться, пока они ожидают завершения операции.

bz2.compress() является синхронным и, Полагаю, не освобождает GIL , но освобождает GIL во время работы. Это означает, что никакая значимая работа не может быть выполнена во время его работы. То есть другие сопрограммы не будут работать во время его вызова, хотя другие потоки будут.

Если вы ожидаете, что большой объем данных для сжатия, настолько большой, что накладные расходы на выполнение сопрограммы по сравнению с этим невелики, вы можете использовать bz2.BZ2Compressor и передавать его данными в достаточно небольших блоках ( например, 128 КБ), запишите результат в поток (S3 поддерживает потоковую передачу или вы можете использовать StringIO) и await asyncio.sleep(0) между сжатыми блоками для обеспечения контроля.

Это позволит другим сопрограммам также работать одновременно с сопрограммой сжатия. Возможно, асинхронная загрузка S3 будет происходить параллельно на уровне сокетов, в то время как ваша сопрограмма будет неактивной.

Кстати, сделать ваш компрессор явно асинхронным генератором может быть более простым способом выразить ту же идею.

...