Давайте рассмотрим, как применить две агрегатные функции к одному итератору, которые мы можем исчерпать только один раз. Начальная попытка (которая для краткости жестко кодирует sum
и max
, но тривиально обобщается на произвольное число агрегатных функций) может выглядеть следующим образом:
def max_and_sum_buffer(it):
content = list(it)
p = sum(content)
m = max(content)
return p, m
Недостатком этой реализации является то, что она сохраняет все сгенерированные элементы в памяти одновременно, несмотря на то, что обе функции прекрасно способны к потоковой обработке. Вопрос ожидает этого отрыва и явно запрашивает результат, который должен быть получен без буферизации вывода итератора. Возможно ли это сделать?
Серийное исполнение: itertools.tee
Это, конечно, кажется возможным. В конце концов, итераторы Python внешние , поэтому каждый итератор уже способен приостановить сам себя. Насколько сложно может быть адаптер, который разделяет итератор на два новых итератора, которые предоставляют одинаковый контент? На самом деле, это именно то описание itertools.tee
, которое идеально подходит для параллельной итерации:
def max_and_sum_tee(it):
it1, it2 = itertools.tee(it)
p = sum(it1) # XXX
m = max(it2)
return p, m
Вышеприведенное дает правильный результат, но не работает так, как нам хотелось бы. Проблема в том, что мы не проводим параллельные итерации. Агрегатные функции, такие как sum
и max
, никогда не приостанавливаются - каждая настаивает на потреблении всего содержимого итератора до получения результата. Так что sum
исчерпает it1
до того, как max
сможет вообще бежать. Исчерпание элементов it1
при оставлении it2
приведет к тому, что эти элементы будут накапливаться во внутреннем FIFO, совместно используемом двумя итераторами. Это неизбежно - поскольку max(it2)
должен видеть одни и те же элементы, у tee
нет другого выбора, кроме как накапливать их. (Более интересные подробности о tee
см. В этой записи. )
Другими словами, нет никакой разницы между этой реализацией и первой, за исключением того, что первая, по крайней мере, делает буферизацию явной. Чтобы исключить буферизацию, sum
и max
должны выполняться параллельно, а не один за другим.
Темы: concurrent.futures
Посмотрим, что произойдет, если мы запустим агрегатные функции в отдельных потоках, все еще используя tee
для дублирования исходного итератора:
def max_and_sum_threads_simple(it):
it1, it2 = itertools.tee(it)
with concurrent.futures.ThreadPoolExecutor(2) as executor:
sum_future = executor.submit(lambda: sum(it1))
max_future = executor.submit(lambda: max(it2))
return sum_future.result(), max_future.result()
Теперь sum
и max
фактически работают параллельно (насколько позволяет GIL ), потоки управляются превосходным модулем concurrent.futures
. Однако у него есть фатальный недостаток: чтобы tee
не буферизовать данные, sum
и max
должны обрабатывать свои элементы с одинаковой скоростью. Если один даже немного быстрее другого, они разойдутся, и tee
буферизует все промежуточные элементы. Поскольку невозможно предсказать, как быстро будет работать каждый из них, объем буферизации непредсказуем и имеет самый ужасный случай буферизации всего.
Чтобы гарантировать, что буферизация не происходит, tee
необходимо заменить на специальный генератор, который ничего не буферизует и блокирует, пока все потребители не наблюдают предыдущее значение, прежде чем перейти к следующему. Как и прежде, каждый потребитель работает в своем собственном потоке, но теперь вызывающий поток занят выполнением производителя, цикла, который фактически перебирает исходный итератор и сигнализирует о наличии нового значения. Вот реализация:
def max_and_sum_threads(it):
STOP = object()
next_val = None
consumed = threading.Barrier(2 + 1) # 2 consumers + 1 producer
val_id = 0
got_val = threading.Condition()
def send(val):
nonlocal next_val, val_id
consumed.wait()
with got_val:
next_val = val
val_id += 1
got_val.notify_all()
def produce():
for elem in it:
send(elem)
send(STOP)
def consume():
last_val_id = -1
while True:
consumed.wait()
with got_val:
got_val.wait_for(lambda: val_id != last_val_id)
if next_val is STOP:
return
yield next_val
last_val_id = val_id
with concurrent.futures.ThreadPoolExecutor(2) as executor:
sum_future = executor.submit(lambda: sum(consume()))
max_future = executor.submit(lambda: max(consume()))
produce()
return sum_future.result(), max_future.result()
Это довольно большой объем кода для чего-то столь простого концептуально, но он необходим для правильной работы.
produce()
перебирает внешний итератор и отправляет элементы потребителям по одному значению за раз.Он использует барьер , удобный примитив синхронизации, добавленный в Python 3.2, чтобы дождаться, пока все потребители закончат со старым значением, прежде чем перезаписать его новым значением в next_val
.Как только новое значение будет готово, передается условие .consume()
- это генератор, который передает полученные значения по мере их поступления до обнаружения STOP
.Код может быть обобщен, запускать любое количество агрегатных функций параллельно, создавая потребителей в цикле и корректируя их количество при создании барьера.
Недостатком этой реализации является то, что она требует создания потоков (возможно, облегченных).сделав пул потоков глобальным) и много очень тщательной синхронизации на каждом проходе итерации.Эта синхронизация снижает производительность - эта версия почти в 2000 раз медленнее однопоточной tee
и в 475 раз медленнее простой, но недетерминированной многопоточной версии.
Тем не менее, пока используются потоки,нельзя избежать синхронизации в той или иной форме.Чтобы полностью исключить синхронизацию, мы должны отказаться от потоков и перейти к совместной многозадачности.Вопрос в том, можно ли приостановить выполнение обычных синхронных функций, таких как sum
и max
, чтобы переключаться между ними?
Волокна: гринлет
Получается, что greenlet
сторонний модуль расширения позволяет именно это.Greenlets - это реализация волокон , легких микропотоков, которые явно переключаются между собой.Это похоже на генераторы Python, которые используют yield
для приостановки, за исключением того, что гринлеты предлагают гораздо более гибкий механизм приостановки, позволяющий выбрать, кого приостановить до .
Это делает егодовольно просто портировать резьбовую версию max_and_sum
на гринлеты:
def max_and_sum_greenlet(it):
STOP = object()
consumers = None
def send(val):
for g in consumers:
g.switch(val)
def produce():
for elem in it:
send(elem)
send(STOP)
def consume():
g_produce = greenlet.getcurrent().parent
while True:
val = g_produce.switch()
if val is STOP:
return
yield val
sum_result = []
max_result = []
gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume())))
gsum.switch()
gmax = greenlet.greenlet(lambda: max_result.append(max(consume())))
gmax.switch()
consumers = (gsum, gmax)
produce()
return sum_result[0], max_result[0]
Логика та же, но с меньшим количеством кода.Как и раньше, produce
создает значения, полученные из исходного итератора, но его send
не беспокоит синхронизация, так как в этом нет необходимости, когда все является однопоточным.Вместо этого он явно переключается на каждого потребителя по очереди, чтобы сделать свое дело, а потребитель покорно переключается обратно.После прохождения всех потребителей, производитель готов к следующему проходу итерации.
Результаты извлекаются с использованием промежуточного одноэлементного списка, потому что greenlet не обеспечивает доступа к возвращаемому значению целевой функции (и ниthreading.Thread
, поэтому мы выбрали concurrent.futures
выше).
Однако есть и недостатки использования гринлетов.Во-первых, они не поставляются со стандартной библиотекой, вам нужно установить расширение greenlet.Кроме того, greenlet по своей природе непереносим, поскольку код переключения стека не поддерживается ОС и компилятором и может считаться хаком (хотя чрезвычайно умный один),Python, нацеленный на WebAssembly или JVM или GraalVM , вряд ли будет поддерживать гринлет.Это не насущная проблема, но это определенно что-то, о чем нужно помнить в течение длительного времени.
Сопрограммы: asyncio
Начиная с Python 3.5, Python предоставляет собственные сопрограммы.В отличие от greenlets и аналогично генераторам, сопрограммы отличаются от обычных функций и должны быть определены с использованием async def
.Сопрограммы не могут быть легко выполнены из синхронного кода, вместо этого они должны обрабатываться планировщиком, который доводит их до завершения.Планировщик также известен как цикл обработки событий , потому что его другая задача - получать события ввода-вывода и передавать их соответствующим обратным вызовам и сопрограммам.В стандартной библиотеке это роль модуля asyncio
.
Перед реализацией max_and_sum
на основе asyncio, мы должны сначала устранить препятствие. В отличие от greenlet, asyncio может только приостанавливать выполнение сопрограмм, но не произвольных функций. Таким образом, мы должны заменить sum
и max
сопрограммами, которые делают по существу то же самое. Это так же просто, как реализовать их очевидным способом, только заменив for
на async for
, позволив асинхронному итератору приостановить сопрограмму в ожидании следующего значения:
async def asum(it):
s = 0
async for elem in it:
s += elem
return s
async def amax(it):
NONE_YET = object()
largest = NONE_YET
async for elem in it:
if largest is NONE_YET or elem > largest:
largest = elem
if largest is NONE_YET:
raise ValueError("amax() arg is an empty sequence")
return largest
# or, using https://github.com/vxgmichel/aiostream
#
#from aiostream.stream import accumulate
#def asum(it):
# return accumulate(it, initializer=0)
#def amax(it):
# return accumulate(it, max)
Можно разумно спросить, обманывает ли предоставление новой пары агрегатных функций; в конце концов, в предыдущих решениях использовались встроенные sum
и max
. Ответ будет зависеть от точной интерпретации вопроса, но я бы сказал, что новые функции разрешены, потому что они никоим образом не являются специфичными для поставленной задачи. Они делают то же самое, что и встроенные модули, но потребляют асинхронные итераторы. Я подозреваю, что единственная причина, по которой такие функции не существуют где-то в стандартной библиотеке, связана с тем, что сопрограммы и асинхронные итераторы являются относительно новой функцией.
После этого мы можем написать max_and_sum
в качестве сопрограммы:
async def max_and_sum_asyncio(it):
loop = asyncio.get_event_loop()
STOP = object()
next_val = loop.create_future()
consumed = loop.create_future()
used_cnt = 2 # number of consumers
async def produce():
for elem in it:
next_val.set_result(elem)
await consumed
next_val.set_result(STOP)
async def consume():
nonlocal next_val, consumed, used_cnt
while True:
val = await next_val
if val is STOP:
return
yield val
used_cnt -= 1
if not used_cnt:
consumed.set_result(None)
consumed = loop.create_future()
next_val = loop.create_future()
used_cnt = 2
else:
await consumed
s, m, _ = await asyncio.gather(asum(consume()), amax(consume()),
produce())
return s, m
Хотя эта версия основана на переключении между сопрограммами внутри одного потока, как и в случае с использованием greenlet, она выглядит иначе. Asyncio не обеспечивает явное переключение сопрограмм, оно основывает переключение задач на await
примитиве приостановки / возобновления. Целью await
может быть другая сопрограмма, но также и абстрактное «будущее», заполнитель значения, который позже будет заполнен другой сопрограммой. Как только ожидаемое значение становится доступным, цикл обработки событий автоматически возобновляет выполнение сопрограммы с выражением await
, оценивающим указанное значение. Таким образом, вместо produce
переключения на потребителей, он приостанавливает себя, ожидая будущего, которое наступит, когда все потребители увидят произведенную стоимость.
consume()
- это асинхронный генератор , который похож на обычный генератор, за исключением того, что он создает асинхронный итератор, который наши агрегатные сопрограммы уже готовы принять с помощью async for
. Эквивалент асинхронного итератора __next__
называется __anext__
и представляет собой сопрограмму, позволяющую сопрограмме, которая исчерпывает асинхронный итератор, приостанавливать ожидание получения нового значения. Когда работающий асинхронный генератор приостанавливается на await
, async for
наблюдает за приостановкой неявного вызова __anext__
. consume()
делает именно это, когда ожидает значений, предоставленных produce
, и, когда они становятся доступными, передает их для агрегирования сопрограмм, таких как asum
и amax
. Ожидание осуществляется с использованием next_val
future, которое несет следующий элемент из it
. Ожидание этого будущего внутри consume()
приостанавливает асинхронный генератор, а вместе с ним и совокупную сопрограмму.
Преимущество этого подхода по сравнению с явным переключением гринлетов состоит в том, что он значительно упрощает объединение сопрограмм, которые не знают друг друга, в один и тот же цикл обработки событий. Например, можно иметь два экземпляра max_and_sum
, работающих параллельно (в одном потоке), или запустить более сложную агрегатную функцию, которая вызывала дополнительный асинхронный код для выполнения вычислений.
Следующая удобная функция показывает, как запустить вышеприведенный код из неасинхронного кода:
def max_and_sum_asyncio_sync(it):
# trivially instantiate the coroutine and execute it in the
# default event loop
coro = max_and_sum_asyncio(it)
return asyncio.get_event_loop().run_until_complete(coro)
Performance
Измерение и сравнение производительности этих подходов к параллельному выполнению может вводить в заблуждение, поскольку sum
и max
почти не обрабатывают, что чрезмерно увеличивает издержки параллелизации. Относитесь к ним так же, как к любым микробенчмаркам с большим количеством соли. Сказав это, давайте все равно посмотрим на цифры!
Измерения были произведены с использованием Python 3.6 Функции были запущены только один раз и получили range(10000)
, их время измерялось путем вычитания time.time()
до и после выполнения. Вот результаты:
max_and_sum_buffer
и max_and_sum_tee
: 0,66 мс - почти одинаковое время для обоих, с версией tee
немного быстрее.
max_and_sum_threads_simple
: 2,7 мсЭто время очень мало значит из-за недетерминированной буферизации, поэтому это может быть измерение времени для запуска двух потоков и синхронизации, выполняемой внутри Python.
max_and_sum_threads
: 1.29 секунд , самый медленный вариант, примерно в 2000 раз медленнее самого быстрого.Этот ужасный результат, вероятно, вызван сочетанием нескольких синхронизаций, выполняемых на каждом шаге итерации, и их взаимодействия с GIL.
max_and_sum_greenlet
: 25,5 мс, медленнее по сравнению сначальная версия, но намного быстрее, чем версия с резьбой.С достаточно сложной агрегатной функцией можно представить использование этой версии в производстве.
max_and_sum_asyncio
: 351 мс, почти в 14 раз медленнее, чем версия с зеленым.Это разочаровывающий результат, потому что сопрограммы asyncio более легкие, чем гринлеты, и переключение между ними должно быть намного быстрее , чем переключение между волокнами.Вполне вероятно, что накладные расходы на запуск планировщика сопрограмм и цикла обработки событий (что в данном случае является избыточным, учитывая, что код не выполняет ввод-вывод) снижают производительность этого микропроцессора.
max_and_sum_asyncio
с использованием uvloop
: 125 мс.Это более чем в два раза превышает скорость обычного asyncio, но все же почти в 5 раз медленнее, чем у greenlet.
Запуск примеров в PyPy не приводит к значительному ускорению, вНа самом деле, большинство примеров работают немного медленнее, даже после их запуска несколько раз, чтобы обеспечить прогрев JIT.Функция asyncio требует переписать , чтобы не использовать асинхронные генераторы (поскольку PyPy на момент написания этой статьи реализует Python 3.5), и выполняется за несколько менее 100 мс.Это сравнимо с производительностью CPython + uvloop, т.е. лучше, но не драматично по сравнению с greenlet.