Python: как выполнять две «агрегатные» функции (например, sum) одновременно, передавая их из одного итератора - PullRequest
0 голосов
/ 26 апреля 2018

Представьте, что у нас есть итератор, скажем, iter(range(1, 1000)). И у нас есть две функции, каждая из которых принимает итератор в качестве единственного параметра, скажем, sum() и max(). В мире SQL мы бы назвали их агрегатными функциями.

Есть ли способ получить результаты обоих без буферизации вывода итератора?

Чтобы сделать это, нам нужно было бы приостановить и возобновить выполнение агрегатной функции, чтобы снабдить их обоих одинаковыми значениями без их сохранения. Может быть, есть способ выразить это, используя асинхронные вещи без снов?

Ответы [ 2 ]

0 голосов
/ 01 августа 2018

Если для ваших агрегатных функций верно значение f(a,b,c,...) == f(a, f(b, f(c, ...))), то вы можете просто циклически проходить по своим функциям и передавать их по одному элементу за раз, каждый раз комбинируя их с результатом предыдущего приложения, как если бы reduce делал например, как это:

def aggregate(iterator, *functions):
    first = next(iterator)
    result = [first] * len(functions)
    for item in iterator:
        for i, f in enumerate(functions):
            result[i] = f((result[i], item))
    return result

Это значительно медленнее (примерно в 10-20 раз), чем просто материализация итератора в списке и применение агрегатной функции к списку в целом, или использование itertools.tee (что в основном делает то же самое внутри), но он имеет преимущество в том, что не использует дополнительную память.

Обратите внимание, однако, что хотя это хорошо работает для таких функций, как sum, min или max, оно не работает для других функций агрегирования, например, найти среднее или медианное значение элемента итератора, как mean(a, b, c) != mean(a, mean(b, c)). (Для mean вы, конечно, могли бы просто получить sum и разделить его на количество элементов, но вычислить, например, медиану, принимающую только один элемент за один раз.)

0 голосов
/ 28 июля 2018

Давайте рассмотрим, как применить две агрегатные функции к одному итератору, которые мы можем исчерпать только один раз. Начальная попытка (которая для краткости жестко кодирует sum и max, но тривиально обобщается на произвольное число агрегатных функций) может выглядеть следующим образом:

def max_and_sum_buffer(it):
    content = list(it)
    p = sum(content)
    m = max(content)
    return p, m

Недостатком этой реализации является то, что она сохраняет все сгенерированные элементы в памяти одновременно, несмотря на то, что обе функции прекрасно способны к потоковой обработке. Вопрос ожидает этого отрыва и явно запрашивает результат, который должен быть получен без буферизации вывода итератора. Возможно ли это сделать?

Серийное исполнение: itertools.tee

Это, конечно, кажется возможным. В конце концов, итераторы Python внешние , поэтому каждый итератор уже способен приостановить сам себя. Насколько сложно может быть адаптер, который разделяет итератор на два новых итератора, которые предоставляют одинаковый контент? На самом деле, это именно то описание itertools.tee, которое идеально подходит для параллельной итерации:

def max_and_sum_tee(it):
    it1, it2 = itertools.tee(it)
    p = sum(it1)  # XXX
    m = max(it2)
    return p, m

Вышеприведенное дает правильный результат, но не работает так, как нам хотелось бы. Проблема в том, что мы не проводим параллельные итерации. Агрегатные функции, такие как sum и max, никогда не приостанавливаются - каждая настаивает на потреблении всего содержимого итератора до получения результата. Так что sum исчерпает it1 до того, как max сможет вообще бежать. Исчерпание элементов it1 при оставлении it2 приведет к тому, что эти элементы будут накапливаться во внутреннем FIFO, совместно используемом двумя итераторами. Это неизбежно - поскольку max(it2) должен видеть одни и те же элементы, у tee нет другого выбора, кроме как накапливать их. (Более интересные подробности о tee см. В этой записи. )

Другими словами, нет никакой разницы между этой реализацией и первой, за исключением того, что первая, по крайней мере, делает буферизацию явной. Чтобы исключить буферизацию, sum и max должны выполняться параллельно, а не один за другим.

Темы: concurrent.futures

Посмотрим, что произойдет, если мы запустим агрегатные функции в отдельных потоках, все еще используя tee для дублирования исходного итератора:

def max_and_sum_threads_simple(it):
    it1, it2 = itertools.tee(it)

    with concurrent.futures.ThreadPoolExecutor(2) as executor:
        sum_future = executor.submit(lambda: sum(it1))
        max_future = executor.submit(lambda: max(it2))

    return sum_future.result(), max_future.result()

Теперь sum и max фактически работают параллельно (насколько позволяет GIL ), потоки управляются превосходным модулем concurrent.futures. Однако у него есть фатальный недостаток: чтобы tee не буферизовать данные, sum и max должны обрабатывать свои элементы с одинаковой скоростью. Если один даже немного быстрее другого, они разойдутся, и tee буферизует все промежуточные элементы. Поскольку невозможно предсказать, как быстро будет работать каждый из них, объем буферизации непредсказуем и имеет самый ужасный случай буферизации всего.

Чтобы гарантировать, что буферизация не происходит, tee необходимо заменить на специальный генератор, который ничего не буферизует и блокирует, пока все потребители не наблюдают предыдущее значение, прежде чем перейти к следующему. Как и прежде, каждый потребитель работает в своем собственном потоке, но теперь вызывающий поток занят выполнением производителя, цикла, который фактически перебирает исходный итератор и сигнализирует о наличии нового значения. Вот реализация:

def max_and_sum_threads(it):
    STOP = object()
    next_val = None
    consumed = threading.Barrier(2 + 1)  # 2 consumers + 1 producer
    val_id = 0
    got_val = threading.Condition()

    def send(val):
        nonlocal next_val, val_id
        consumed.wait()
        with got_val:
            next_val = val
            val_id += 1
            got_val.notify_all()

    def produce():
        for elem in it:
            send(elem)
        send(STOP)

    def consume():
        last_val_id = -1
        while True:
            consumed.wait()
            with got_val:
                got_val.wait_for(lambda: val_id != last_val_id)
            if next_val is STOP:
                return
            yield next_val
            last_val_id = val_id

    with concurrent.futures.ThreadPoolExecutor(2) as executor:
        sum_future = executor.submit(lambda: sum(consume()))
        max_future = executor.submit(lambda: max(consume()))
        produce()

    return sum_future.result(), max_future.result()

Это довольно большой объем кода для чего-то столь простого концептуально, но он необходим для правильной работы.

produce() перебирает внешний итератор и отправляет элементы потребителям по одному значению за раз.Он использует барьер , удобный примитив синхронизации, добавленный в Python 3.2, чтобы дождаться, пока все потребители закончат со старым значением, прежде чем перезаписать его новым значением в next_val.Как только новое значение будет готово, передается условие .consume() - это генератор, который передает полученные значения по мере их поступления до обнаружения STOP.Код может быть обобщен, запускать любое количество агрегатных функций параллельно, создавая потребителей в цикле и корректируя их количество при создании барьера.

Недостатком этой реализации является то, что она требует создания потоков (возможно, облегченных).сделав пул потоков глобальным) и много очень тщательной синхронизации на каждом проходе итерации.Эта синхронизация снижает производительность - эта версия почти в 2000 раз медленнее однопоточной tee и в 475 раз медленнее простой, но недетерминированной многопоточной версии.

Тем не менее, пока используются потоки,нельзя избежать синхронизации в той или иной форме.Чтобы полностью исключить синхронизацию, мы должны отказаться от потоков и перейти к совместной многозадачности.Вопрос в том, можно ли приостановить выполнение обычных синхронных функций, таких как sum и max, чтобы переключаться между ними?

Волокна: гринлет

Получается, что greenlet сторонний модуль расширения позволяет именно это.Greenlets - это реализация волокон , легких микропотоков, которые явно переключаются между собой.Это похоже на генераторы Python, которые используют yield для приостановки, за исключением того, что гринлеты предлагают гораздо более гибкий механизм приостановки, позволяющий выбрать, кого приостановить до .

Это делает егодовольно просто портировать резьбовую версию max_and_sum на гринлеты:

def max_and_sum_greenlet(it):
    STOP = object()
    consumers = None

    def send(val):
        for g in consumers:
            g.switch(val)

    def produce():
        for elem in it:
            send(elem)
        send(STOP)

    def consume():
        g_produce = greenlet.getcurrent().parent
        while True:
            val = g_produce.switch()
            if val is STOP:
                return
            yield val

    sum_result = []
    max_result = []
    gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume())))
    gsum.switch()
    gmax = greenlet.greenlet(lambda: max_result.append(max(consume())))
    gmax.switch()
    consumers = (gsum, gmax)
    produce()

    return sum_result[0], max_result[0]

Логика та же, но с меньшим количеством кода.Как и раньше, produce создает значения, полученные из исходного итератора, но его send не беспокоит синхронизация, так как в этом нет необходимости, когда все является однопоточным.Вместо этого он явно переключается на каждого потребителя по очереди, чтобы сделать свое дело, а потребитель покорно переключается обратно.После прохождения всех потребителей, производитель готов к следующему проходу итерации.

Результаты извлекаются с использованием промежуточного одноэлементного списка, потому что greenlet не обеспечивает доступа к возвращаемому значению целевой функции (и ниthreading.Thread, поэтому мы выбрали concurrent.futures выше).

Однако есть и недостатки использования гринлетов.Во-первых, они не поставляются со стандартной библиотекой, вам нужно установить расширение greenlet.Кроме того, greenlet по своей природе непереносим, ​​поскольку код переключения стека не поддерживается ОС и компилятором и может считаться хаком (хотя чрезвычайно умный один),Python, нацеленный на WebAssembly или JVM или GraalVM , вряд ли будет поддерживать гринлет.Это не насущная проблема, но это определенно что-то, о чем нужно помнить в течение длительного времени.

Сопрограммы: asyncio

Начиная с Python 3.5, Python предоставляет собственные сопрограммы.В отличие от greenlets и аналогично генераторам, сопрограммы отличаются от обычных функций и должны быть определены с использованием async def.Сопрограммы не могут быть легко выполнены из синхронного кода, вместо этого они должны обрабатываться планировщиком, который доводит их до завершения.Планировщик также известен как цикл обработки событий , потому что его другая задача - получать события ввода-вывода и передавать их соответствующим обратным вызовам и сопрограммам.В стандартной библиотеке это роль модуля asyncio.

Перед реализацией max_and_sum на основе asyncio, мы должны сначала устранить препятствие. В отличие от greenlet, asyncio может только приостанавливать выполнение сопрограмм, но не произвольных функций. Таким образом, мы должны заменить sum и max сопрограммами, которые делают по существу то же самое. Это так же просто, как реализовать их очевидным способом, только заменив for на async for, позволив асинхронному итератору приостановить сопрограмму в ожидании следующего значения:

async def asum(it):
    s = 0
    async for elem in it:
        s += elem
    return s

async def amax(it):
    NONE_YET = object()
    largest = NONE_YET
    async for elem in it:
        if largest is NONE_YET or elem > largest:
            largest = elem
    if largest is NONE_YET:
        raise ValueError("amax() arg is an empty sequence")
    return largest

# or, using https://github.com/vxgmichel/aiostream
#
#from aiostream.stream import accumulate
#def asum(it):
#    return accumulate(it, initializer=0)
#def amax(it):
#    return accumulate(it, max)

Можно разумно спросить, обманывает ли предоставление новой пары агрегатных функций; в конце концов, в предыдущих решениях использовались встроенные sum и max. Ответ будет зависеть от точной интерпретации вопроса, но я бы сказал, что новые функции разрешены, потому что они никоим образом не являются специфичными для поставленной задачи. Они делают то же самое, что и встроенные модули, но потребляют асинхронные итераторы. Я подозреваю, что единственная причина, по которой такие функции не существуют где-то в стандартной библиотеке, связана с тем, что сопрограммы и асинхронные итераторы являются относительно новой функцией.

После этого мы можем написать max_and_sum в качестве сопрограммы:

async def max_and_sum_asyncio(it):
    loop = asyncio.get_event_loop()
    STOP = object()

    next_val = loop.create_future()
    consumed = loop.create_future()
    used_cnt = 2  # number of consumers

    async def produce():
        for elem in it:
            next_val.set_result(elem)
            await consumed
        next_val.set_result(STOP)

    async def consume():
        nonlocal next_val, consumed, used_cnt
        while True:
            val = await next_val
            if val is STOP:
                return
            yield val
            used_cnt -= 1
            if not used_cnt:
                consumed.set_result(None)
                consumed = loop.create_future()
                next_val = loop.create_future()
                used_cnt = 2
            else:
                await consumed

    s, m, _ = await asyncio.gather(asum(consume()), amax(consume()),
                                   produce())
    return s, m

Хотя эта версия основана на переключении между сопрограммами внутри одного потока, как и в случае с использованием greenlet, она выглядит иначе. Asyncio не обеспечивает явное переключение сопрограмм, оно основывает переключение задач на await примитиве приостановки / возобновления. Целью await может быть другая сопрограмма, но также и абстрактное «будущее», заполнитель значения, который позже будет заполнен другой сопрограммой. Как только ожидаемое значение становится доступным, цикл обработки событий автоматически возобновляет выполнение сопрограммы с выражением await, оценивающим указанное значение. Таким образом, вместо produce переключения на потребителей, он приостанавливает себя, ожидая будущего, которое наступит, когда все потребители увидят произведенную стоимость.

consume() - это асинхронный генератор , который похож на обычный генератор, за исключением того, что он создает асинхронный итератор, который наши агрегатные сопрограммы уже готовы принять с помощью async for. Эквивалент асинхронного итератора __next__ называется __anext__ и представляет собой сопрограмму, позволяющую сопрограмме, которая исчерпывает асинхронный итератор, приостанавливать ожидание получения нового значения. Когда работающий асинхронный генератор приостанавливается на await, async for наблюдает за приостановкой неявного вызова __anext__. consume() делает именно это, когда ожидает значений, предоставленных produce, и, когда они становятся доступными, передает их для агрегирования сопрограмм, таких как asum и amax. Ожидание осуществляется с использованием next_val future, которое несет следующий элемент из it. Ожидание этого будущего внутри consume() приостанавливает асинхронный генератор, а вместе с ним и совокупную сопрограмму.

Преимущество этого подхода по сравнению с явным переключением гринлетов состоит в том, что он значительно упрощает объединение сопрограмм, которые не знают друг друга, в один и тот же цикл обработки событий. Например, можно иметь два экземпляра max_and_sum, работающих параллельно (в одном потоке), или запустить более сложную агрегатную функцию, которая вызывала дополнительный асинхронный код для выполнения вычислений.

Следующая удобная функция показывает, как запустить вышеприведенный код из неасинхронного кода:

def max_and_sum_asyncio_sync(it):
    # trivially instantiate the coroutine and execute it in the
    # default event loop
    coro = max_and_sum_asyncio(it)
    return asyncio.get_event_loop().run_until_complete(coro)

Performance

Измерение и сравнение производительности этих подходов к параллельному выполнению может вводить в заблуждение, поскольку sum и max почти не обрабатывают, что чрезмерно увеличивает издержки параллелизации. Относитесь к ним так же, как к любым микробенчмаркам с большим количеством соли. Сказав это, давайте все равно посмотрим на цифры!

Измерения были произведены с использованием Python 3.6 Функции были запущены только один раз и получили range(10000), их время измерялось путем вычитания time.time() до и после выполнения. Вот результаты:

  • max_and_sum_buffer и max_and_sum_tee: 0,66 мс - почти одинаковое время для обоих, с версией tee немного быстрее.

  • max_and_sum_threads_simple: 2,7 мсЭто время очень мало значит из-за недетерминированной буферизации, поэтому это может быть измерение времени для запуска двух потоков и синхронизации, выполняемой внутри Python.

  • max_and_sum_threads: 1.29 секунд , самый медленный вариант, примерно в 2000 раз медленнее самого быстрого.Этот ужасный результат, вероятно, вызван сочетанием нескольких синхронизаций, выполняемых на каждом шаге итерации, и их взаимодействия с GIL.

  • max_and_sum_greenlet: 25,5 мс, медленнее по сравнению сначальная версия, но намного быстрее, чем версия с резьбой.С достаточно сложной агрегатной функцией можно представить использование этой версии в производстве.

  • max_and_sum_asyncio: 351 мс, почти в 14 раз медленнее, чем версия с зеленым.Это разочаровывающий результат, потому что сопрограммы asyncio более легкие, чем гринлеты, и переключение между ними должно быть намного быстрее , чем переключение между волокнами.Вполне вероятно, что накладные расходы на запуск планировщика сопрограмм и цикла обработки событий (что в данном случае является избыточным, учитывая, что код не выполняет ввод-вывод) снижают производительность этого микропроцессора.

  • max_and_sum_asyncio с использованием uvloop: 125 мс.Это более чем в два раза превышает скорость обычного asyncio, но все же почти в 5 раз медленнее, чем у greenlet.

Запуск примеров в PyPy не приводит к значительному ускорению, вНа самом деле, большинство примеров работают немного медленнее, даже после их запуска несколько раз, чтобы обеспечить прогрев JIT.Функция asyncio требует переписать , чтобы не использовать асинхронные генераторы (поскольку PyPy на момент написания этой статьи реализует Python 3.5), и выполняется за несколько менее 100 мс.Это сравнимо с производительностью CPython + uvloop, т.е. лучше, но не драматично по сравнению с greenlet.

...