Как вы можете передать итерацию нескольким потребителям в постоянном пространстве?
TLDR
Напишите реализацию, которая проходит следующий тест в CONSTANT SPACE, обрабатывая min
, max
и sum
как черные ящики.
def testit(implementation, N):
assert implementation(range(N), min, max, sum) == (0, N-1, N*(N-1)//2)
Обсуждение
Мы любим итераторы, потому что они позволяют нам обрабатывать потоки данных лениво, позволяя обрабатывать огромные объемы данных в CONSTANT SPACE.
def source_summary(source, summary):
return summary(source)
N = 10 ** 8
print(source_summary(range(N), min))
print(source_summary(range(N), max))
print(source_summary(range(N), sum))
Каждая строка выполнялась в течение нескольких секунд, но использовала очень мало памяти. Однако для этого потребовалось 3 отдельных обхода источника. Так что это не будет работать, если вашим источником является сетевое соединение, оборудование для сбора данных и т. Д. c. если только вы не кешируете все данные где-то, теряя требование CONSTANT SPACE.
Вот версия, которая демонстрирует эту проблему
def source_summaries(source, *summaries):
from itertools import tee
return tuple(map(source_summary, tee(source, len(summaries)),
summaries))
testit(source_summaries, N)
print('OK')
Тест проходит, но tee
пришлось сохранить копию все данные, поэтому использование пространства возрастает с O(1)
до O(N)
.
Как вы можете получить результаты за один проход с постоянной памятью?
Это, конечно, Можно пройти тест, указанный сверху, с использованием O(1)
пространства, путем мошенничества: используя знания конкретных c пользователей-итераторов, которые используются в тесте. Но дело не в этом: source_summaries
должен работать с любыми итераторами, такими как set
, collections.Counter
, ''.join
, включая все, что может быть написано в будущем. Реализация должна рассматривать их как черные ящики.
Для ясности: знания о потребителях only состоят в том, что каждый из них потребляет одну итерацию и возвращает один результат. Использование любых других знаний о потребителе - это мошенничество.
Идеи
[ РЕДАКТИРОВАТЬ : я опубликовал реализацию этой идеи в качестве ответа]
Я могу представить себе решение (которое мне действительно не нравится), которое использует
Давайте назовем пользовательский итератор link
.
- . Для каждого потребителя выполните
result = consumer(<link instance for this thread>)
<link instance for this thread>.set_result(result)
на отдельный поток.
- В главном потоке что-то вроде строк
for item in source:
for l in links:
l.push(item)
for l in links:
l.stop()
for thread in threads:
thread.join()
return tuple(link.get_result, links)
link.__next__
до экземпляра link
получает
.push(item)
, в этом случае он возвращает элемент .stop()
, в этом случае он поднимает StopIteration
Данные гонки выглядят как кошмар. Вам понадобится очередь для толчков, и, вероятно, нужно будет поместить в очередь дозорный объект link.stop()
... и кучу других вещей, которые я пропускаю.
Я бы предпочел использовать кооперативную многопоточность, но consumer(link)
кажется неизбежно некооперативной.
У вас есть менее грязные предложения?