Пакетные звонки на дорогую функцию - PullRequest
0 голосов
/ 28 января 2020
def main_function(keys):
    values = []
    for key in keys:
        value, = get_from_remote_location([key])
        values.append(value)
    return values



def get_from_remote_location(keys):
    return map(str, keys)

У меня есть функция, которая перебирает набор ключей, вызывает дорогую функцию для каждого ключа, чтобы получить значение, и возвращает коллекцию значений. Я хотел бы вызвать дорогую функцию только один раз. Это надуманный пример, и я мог бы просто избегать перебора ключей, но это только для иллюстрации. Ограничения таковы: вызов должен быть выполнен только один раз, функция main_function должна использовать итерацию по сбору ключей и выполнять, казалось бы, удаленный вызов для каждого ключа.

Я думал об использовании asyncio для сбора ключей Сначала, но решения, которые я придумал, были ужасными. Что-то вроде:

keys = []

...

async def get_from_remote_location_batched(key):
    keys.append(key)
    await until_all_keys_have_been_appended_and_remote_call_has_been_done(keys)[key]

Редактировать 2020-01-28:

У меня есть решение. Постараюсь опубликовать завтра.

1 Ответ

0 голосов
/ 30 января 2020
from contextlib import asynccontextmanager, contextmanager
import asyncio
import pprint
import random
import uuid


class Transport:

    def __init__(self):
        """Used for passing information between coroutines.
        """
        self.key = None
        self.data = None
        self.data_set = False
        self.coroutine = None

    @contextmanager
    def __call__(self, coroutine):
        """Register a coroutine.
        """
        self.coroutine = coroutine
        self.__call__ = 'Disabled'

    def set_data(self, value):
        self.data = value
        self.data_set = True

    def get_awaitable(self):
        """Yield awaitable as long as data is not set.

        It's used to yield back control so that others can act.
        """
        while not self.data_set:
            yield asyncio.sleep(0)


class TransportGetter:

    def __init__(self):
        """For keeping track of multiple Transport objects.

        Convenient for one-to-many communication.
        """
        self.transports = []

    @contextmanager
    def get(self):
        transport = Transport()
        self.transports.append(transport)
        yield transport

    @property
    def coroutines(self):
        return [transport.coroutine for transport in self.transports]


def expensive_function(keys):
    # E.g. a database query
    return {k: k * 10 for k in keys}


@asynccontextmanager
async def remote_caller(fn):
    """Call fn when control is given back.
    """
    transport_getter = TransportGetter()

    yield transport_getter

    name = fn.__name__
    print(f'{name} caller is yielding transport getter')
    await asyncio.wait(transport_getter.coroutines, timeout=0)

    print(f'{name} caller is about to call and retrieve data')
    keys = [t.key for t in transport_getter.transports]
    data = fn(keys)

    print(f'{name} caller retrievied data\n'
            + f'{pprint.pformat(list(data.values()))}\n'
            + f'for keys\n{pprint.pformat(keys)}\n'
            + f'and is pushing individual responses back''')
    for transport in transport_getter.transports:
        transport.set_data(data[transport.key])
    print(f'{name} caller is done')


async def wait_for_data(transport, name):
    for awaitable in transport.get_awaitable():
        print(f'{name} is waiting and giving up control...')
        await awaitable


async def scalar_level_coroutine(transport, id_):
    """This represents some procedure requiring expensive call.

    Operates on a level of individual keys.
    Would be very expensive to call many of those
    without batching them first.
    """
    key = random.random()
    transport.key = key
    name = f'Worker {id_}'
    print(f'{name} pushed its key ({key}) and is giving up control...')

    await wait_for_data(transport, name)

    calculated_value = 2 ** int(transport.data)

    print(f'{name} calculated value {calculated_value} and is done...')
    return calculated_value


def add_coroutine(transport_getter, *args, **kwargs):
    with transport_getter.get() as transport:
        transport(scalar_level_coroutine(transport, *args, **kwargs))


async def main():
    async with remote_caller(expensive_function) as transport_getter:
        add_coroutine(transport_getter, 1)
        add_coroutine(transport_getter, 2)
        add_coroutine(transport_getter, 3)
        add_coroutine(transport_getter, 4)


if __name__ == '__main__':
    asyncio.run(main())

Выход:

expensive_function caller is yielding transport getter
Worker 2 pushed its key (0.8549279924668386) and is giving up control...
Worker 2 is waiting and giving up control...
Worker 4 pushed its key (0.15663005612702752) and is giving up control...
Worker 4 is waiting and giving up control...
Worker 1 pushed its key (0.4688385351885074) and is giving up control...
Worker 1 is waiting and giving up control...
Worker 3 pushed its key (0.12254117937847231) and is giving up control...
Worker 3 is waiting and giving up control...
Worker 2 is waiting and giving up control...
Worker 4 is waiting and giving up control...
Worker 1 is waiting and giving up control...
Worker 3 is waiting and giving up control...
expensive_function caller is about to call and retrieve data
expensive_function caller retrievied data
[4.688385351885074, 8.549279924668387, 1.225411793784723, 1.5663005612702752]
for keys
[0.4688385351885074,
 0.8549279924668386,
 0.12254117937847231,
 0.15663005612702752]
and is pushing individual responses back
expensive_function caller is done
Worker 2 calculated value 256 and is done...
Worker 4 calculated value 2 and is done...
Worker 1 calculated value 16 and is done...
Worker 3 calculated value 2 and is done...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...