Как эффективно использовать asyncio при вызове метода на BaseProxy? - PullRequest
0 голосов
/ 11 февраля 2019

Я работаю над приложением, которое использует LevelDB и которое использует несколько долгоживущих процессов для различных задач.

Поскольку LevelDB позволяет только одному процессу поддерживать соединение с базой данных, доступ ко всем нашим базам данныхнаправляется через специальный процесс базы данных .

Для доступа к базе данных из другого процесса мы используем BaseProxy.Но поскольку мы используем asyncio, наш прокси не должен блокировать эти API, которые вызывают процесс db, который затем в конечном итоге читает из db.Поэтому мы реализуем API на прокси с помощью исполнителя.

    loop = asyncio.get_event_loop()

    return await loop.run_in_executor(
        thread_pool_executor,
        self._callmethod,
        method_name,
        args,
    )

И хотя это работает просто отлично, мне интересно, есть ли лучшая альтернатива обертыванию вызова _callmethod BaseProxy в ThreadPoolExecutor.

Насколько я понимаю, вызов BaseProxy в процесс DB является учебным примером ожидания ввода-вывода, поэтому использование потока для этого кажется ненужным расточительным.

Видеальный мир, я бы предположил, что async _acallmethod существует на BaseProxy, но, к сожалению, API не существует.

Итак, мой вопрос сводится к следующему: при работе с BaseProxyболее эффективная альтернатива выполнению этих межпроцессных вызовов в ThreadPoolExecutor?

Ответы [ 3 ]

0 голосов
/ 15 февраля 2019

Если у вас есть питон и база данных, работающие в одной и той же системе (то есть вы не хотите async никаких сетевых вызовов), у вас есть два варианта.

  1. что вы уже делаете (запустите в executor).Он блокирует поток БД, но основной поток остается свободным, чтобы делать другие вещи.Это не просто неблокирование, но это вполне приемлемое решение для случаев блокировки ввода / вывода, с небольшими накладными расходами на поддержание потока.

  2. Для настоящего неблокирующего решения (который может быть запущен в одном потоке без блокировки) вы должны иметь # 1.встроенная поддержка async (обратный вызов) из БД для каждого вызова извлечения и # 2 оборачивают это в вашей пользовательской реализации цикла событий.Здесь вы создаете подкласс цикла Base и переписываете методы для интеграции ваших обратных вызовов db.Например, вы можете создать базовый цикл, который реализует конвейерный сервер.БД пишет в канал, а Python опрашивает канал.См. Реализацию цикла событий Proactor в базе кода asyncio.Примечание: я никогда не реализовывал никакого пользовательского цикла событий.

Я не знаком с leveldb, но для хранилища значений ключей неясно, будет ли какое-либо существенное преимущество длятакой обратный вызов для выборки и чистой неблокирующей реализации.Если вы получаете несколько выборок внутри итератора, и это ваша основная проблема, вы можете сделать цикл async (при этом каждая выборка все еще блокируется) и повысить производительность.Ниже приведен фиктивный код, объясняющий это.

import asyncio
import random
import time

async def talk_to_db(d):
    """ 
        blocking db iteration. sleep is the fetch function.
    """
    for k, v in d.items():
        time.sleep(1)
        yield (f"{k}:{v}")

async def talk_to_db_async(d):
    """ 
        real non-blocking db iteration. fetch (sleep) is native async here 
    """
    for k, v in d.items():
        await asyncio.sleep(1)
        yield (f"{k}:{v}")

async def talk_to_db_async_loop(d):
    """ 
        semi-non-blocking db iteration. fetch is blocking, but the
        loop is not.
    """
    for k, v in d.items():
        time.sleep(1)
        yield (f"{k}:{v}")
        await asyncio.sleep(0)

async def db_call_wrapper(db):
    async for row in talk_to_db(db):
        print(row)

async def db_call_wrapper_async(db):
    async for row in talk_to_db_async(db):
        print(row)

async def db_call_wrapper_async_loop(db):
    async for row in talk_to_db_async_loop(db):
        print(row)

async def func(i):
    await asyncio.sleep(5)
    print(f"done with {i}")

database = {i:random.randint(1,20) for i in range(20)}

async def main():
    db_coro = db_call_wrapper(database)
    coros = [func(i) for i in range(20)]
    coros.append(db_coro)
    await asyncio.gather(*coros)

async def main_async():
    db_coro = db_call_wrapper_async(database)
    coros = [func(i) for i in range(20)]
    coros.append(db_coro)
    await asyncio.gather(*coros)

async def main_async_loop():
    db_coro = db_call_wrapper_async_loop(database)
    coros = [func(i) for i in range(20)]
    coros.append(db_coro)
    await asyncio.gather(*coros)

# run the blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# run the non-blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async())

# run the non-blocking (loop only) db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async_loop())

Это то, что вы можете попробовать.В противном случае, я бы сказал, что ваш нынешний метод достаточно эффективен.Я не думаю, что BaseProxy может дать вам асинхронный вызов API, он не знает, как обрабатывать обратный вызов из вашей базы данных.

0 голосов
/ 17 февраля 2019

К сожалению, многопроцессорная библиотека не подходит для преобразования в asyncio, то, что у вас есть, лучшее, что вы можете сделать, если вам нужно использовать BaseProxy для обработки вашего IPC (межпроцессного взаимодействия).

ХотяЭто правда, что библиотека использует блокирующий ввод / вывод, здесь вы не можете легко достучаться и переработать блокирующие части, чтобы вместо них использовать неблокирующие примитивы.Если бы вы настаивали на том, чтобы идти по этому пути, вам пришлось бы исправлять или переписывать внутренние подробности реализации этой библиотеки, но, будучи внутренними подробностями реализации, они могут отличаться от точечного выпуска Python до точечного выпуска, делая любое исправление хрупким и склонным к поломке с незначительнымиОбновления Python.Метод _callmethod является частью глубокой иерархии абстракций, включающей потоки, соединения сокетов или каналов и сериализаторы.См. multiprocessing/connection.py и multiprocessing/managers.py.

Так что ваши варианты здесь - придерживаться вашего текущего подхода (использование исполнителя пула потоков для передачи BaseProxy._callmethod() другомунить) o r для реализации собственного решения IPC с использованием примитивов asyncio.Ваш центральный процесс доступа к базе данных будет действовать как сервер для подключения других ваших процессов в качестве клиента, используя либо сокеты, либо именованные каналы, используя согласованную схему сериализации для клиентских запросов и ответов сервера.Это то, что multiprocessing реализует для вас, но вы бы реализовали свою собственную (более простую) версию, используя asyncio streams и любую схему сериализации, наиболее подходящую для ваших шаблонов приложений (например,рассол, JSON, протобуферы или что-то еще целиком).

0 голосов
/ 12 февраля 2019

Пул потоков - это то, что вы хотите.aioprocessing обеспечивает некоторую асинхронную функциональность многопроцессорной обработки, но делает это с использованием потоков, как вы предложили.Я предлагаю создать проблему с python, если ее не существует для предоставления истинной асинхронной многопроцессорной обработки.

https://github.com/dano/aioprocessing

В большинстве случаев эта библиотека делает асинхронными вызовы блокировки многопроцессорных методов.выполнив вызов в ThreadPoolExecutor

...