Как вы можете кормить итерируемое для нескольких потребителей в постоянном пространстве? - PullRequest
0 голосов
/ 08 апреля 2020

Как вы можете передать итерацию нескольким потребителям в постоянном пространстве?

TLDR

Напишите реализацию, которая проходит следующий тест в CONSTANT SPACE, обрабатывая min, max и sum как черные ящики.

def testit(implementation, N):
    assert implementation(range(N), min, max, sum) == (0, N-1, N*(N-1)//2)

Обсуждение

Мы любим итераторы, потому что они позволяют нам обрабатывать потоки данных лениво, позволяя обрабатывать огромные объемы данных в CONSTANT SPACE.

def source_summary(source, summary):
    return summary(source)

N = 10 ** 8
print(source_summary(range(N), min))
print(source_summary(range(N), max))
print(source_summary(range(N), sum))

Каждая строка выполнялась в течение нескольких секунд, но использовала очень мало памяти. Однако для этого потребовалось 3 отдельных обхода источника. Так что это не будет работать, если вашим источником является сетевое соединение, оборудование для сбора данных и т. Д. c. если только вы не кешируете все данные где-то, теряя требование CONSTANT SPACE.

Вот версия, которая демонстрирует эту проблему

def source_summaries(source, *summaries):
    from itertools import tee
    return tuple(map(source_summary, tee(source, len(summaries)),
                                     summaries))

testit(source_summaries, N)
print('OK')

Тест проходит, но tee пришлось сохранить копию все данные, поэтому использование пространства возрастает с O(1) до O(N).

Как вы можете получить результаты за один проход с постоянной памятью?

Это, конечно, Можно пройти тест, указанный сверху, с использованием O(1) пространства, путем мошенничества: используя знания конкретных c пользователей-итераторов, которые используются в тесте. Но дело не в этом: source_summaries должен работать с любыми итераторами, такими как set, collections.Counter, ''.join, включая все, что может быть написано в будущем. Реализация должна рассматривать их как черные ящики.

Для ясности: знания о потребителях only состоят в том, что каждый из них потребляет одну итерацию и возвращает один результат. Использование любых других знаний о потребителе - это мошенничество.

Идеи

[ РЕДАКТИРОВАТЬ : я опубликовал реализацию этой идеи в качестве ответа]

Я могу представить себе решение (которое мне действительно не нравится), которое использует

  • упреждающую многопоточность

  • пользовательский итератор, связывающий потребителя с source

Давайте назовем пользовательский итератор link.

  • . Для каждого потребителя выполните
result = consumer(<link instance for this thread>)
<link instance for this thread>.set_result(result)

на отдельный поток.

  • В главном потоке что-то вроде строк
for item in source:
    for l in links:
        l.push(item)

for l in links:
    l.stop()

for thread in threads:
    thread.join()

return tuple(link.get_result, links)
  • link.__next__ до экземпляра link получает

    • .push(item), в этом случае он возвращает элемент
    • .stop(), в этом случае он поднимает StopIteration
  • Данные гонки выглядят как кошмар. Вам понадобится очередь для толчков, и, вероятно, нужно будет поместить в очередь дозорный объект link.stop() ... и кучу других вещей, которые я пропускаю.

Я бы предпочел использовать кооперативную многопоточность, но consumer(link) кажется неизбежно некооперативной.

У вас есть менее грязные предложения?

Ответы [ 2 ]

1 голос
/ 09 апреля 2020

Вот альтернативная реализация вашей идеи. Он использует совместную многопоточность. Как вы и предлагали, ключевым моментом является использование многопоточности и наличия блока методов итераторов __next__, пока все потоки не будут использовать текущую итерацию.

Кроме того, итератор содержит (необязательный) буфер постоянной размер. С помощью этого буфера мы можем читать источник в блоках и избегать многих блокировок / синхронизации.

Моя реализация также обрабатывает случай, когда некоторые потребители прекращают итерацию до достижения конца итератора.

import threading

class BufferedMultiIter:
    def __init__(self, source, n, bufsize = 1):
        '''`source` is an iterator or iterable,
        `n` is the number of threads that will interact with this iterator,
        `bufsize` is the size of the internal buffer. The iterator will read
        and buffer elements from `source` in chunks of `bufsize`. The bigger
        the buffer is, the better the performance but also the bigger the
        (constant) space requirement.
        '''
        self._source = iter(source)
        self._n = n
        # Condition variable for synchronization
        self._cond = threading.Condition()
        # Buffered values
        bufsize = max(bufsize, 1)
        self._buffer = [None] * bufsize
        self._buffered = 0
        self._next = threading.local()
        # State variables to implement the "wait for buffer to get refilled"
        # protocol
        self._serial = 0
        self._waiting = 0

        # True if we reached the end of the source
        self._stop = False
        # Was the thread killed (for error handling)?
        self._killed = False

    def _fill_buffer(self):
        '''Refill the internal buffer.'''
        self._buffered = 0
        while self._buffered < len(self._buffer):
            try:
                self._buffer[self._buffered] = next(self._source)
                self._buffered += 1
            except StopIteration:
                self._stop = True
                break
            # Explicitly clear the unused part of the buffer to release
            # references as early as possible
            for i in range(self._buffered, len(self._buffer)):
                self._buffer[i] = None
        self._waiting = 0
        self._serial += 1

    def register_thread(self):
        '''Register a thread.

        Each thread that wants to access this iterator must first register
        with the iterator. It is an error to register the same thread more
        than once. It is an error to access this iterator with a thread that
        was not registered (with the exception of calling `kill`). It is an
        error to register more threads than the number that was passed to the
        constructor.
        '''
        self._next.i = 0

    def unregister_thread(self):
        '''Unregister a thread from this iterator.

        This should be called when a thread is done using the iterator.
        It catches the case in which a consumer does not consume all the
        elements from the iterator but exits early.
        '''
        assert hasattr(self._next, 'i')
        delattr(self._next, 'i')
        with self._cond:
            assert self._n > 0
            self._n -= 1
            if self._waiting == self._n:
                self._fill_buffer()
            self._cond.notify_all()

    def kill(self):
        '''Forcibly kill this iterator.

        This will wake up all threads currently blocked in `__next__` and
        will have them raise a `StopIteration`.
        This function should be called in case of error to terminate all
        threads as fast as possible.
        '''
        self._cond.acquire()
        self._killed = True
        self._stop = True
        self._cond.notify_all()
        self._cond.release()
    def __iter__(self): return self
    def __next__(self):
        if self._next.i == self._buffered:
            # We read everything from the buffer.
            # Wait until all other threads have also consumed the buffer
            # completely and then refill it.
            with self._cond:
                old = self._serial
                self._waiting += 1
                if self._waiting == self._n:
                    self._fill_buffer()
                    self._cond.notify_all()
                else:
                    # Wait until the serial number changes. A change in
                    # serial number indicates that another thread has filled
                    # the buffer
                    while self._serial == old and not self._killed:
                        self._cond.wait()
            # Start at beginning of newly filled buffer
            self._next.i = 0

        if self._killed:
            raise StopIteration
        k = self._next.i
        if k == self._buffered and self._stop:
            raise StopIteration
        value = self._buffer[k]
        self._next.i = k + 1
        return value

class NotAll:
    '''A consumer that does not consume all the elements from the source.'''
    def __init__(self, limit):
        self._limit = limit
        self._consumed = 0
    def __call__(self, it):
        last = None
        for k in it:
            last = k
            self._consumed += 1
            if self._consumed >= self._limit:
                break
        return last

def multi_iter(iterable, *consumers, **kwargs):
    '''Iterate using multiple consumers.

    Each value in `iterable` is presented to each of the `consumers`.
    The function returns a tuple with the results of all `consumers`.

    There is an optional `bufsize` argument. This controls the internal
    buffer size. The bigger the buffer, the better the performance, but also
    the bigger the (constant) space requirement of the operation.

    NOTE: This will spawn a new thread for each consumer! The iteration is
    multi-threaded and happens in parallel for each element.
    '''
    n = len(consumers)
    it = BufferedMultiIter(iterable, n, kwargs.get('bufsize', 1))
    threads = list() # List with **running** threads
    result = [None] * n
    def thread_func(i, c):
        it.register_thread()
        result[i] = c(it)
        it.unregister_thread()
    try:
        for c in consumers:
            t = threading.Thread(target = thread_func, args = (len(threads), c))
            t.start()
            threads.append(t)
    except:
        # Here we should forcibly kill all the threads but there is not
        # t.kill() function or similar. So the best we can do is stop the
        # iterator
        it.kill()
    finally:
        while len(threads) > 0:
            t = threads.pop(-1)
            t.join()
    return tuple(result)

from time import time
N = 10 ** 7
notall1 = NotAll(1)
notall1000 = NotAll(1000)
start1 = time()
res1 = (min(range(N)), max(range(N)), sum(range(N)), NotAll(1)(range(N)),
        NotAll(1000)(range(N)))
stop1 = time()
print('5 iterators: %s %.2f' % (str(res1), stop1 - start1))

for p in range(5):
    start2 = time()
    res2 = multi_iter(range(N), min, max, sum, NotAll(1), NotAll(1000),
                      bufsize = 2**p)
    stop2 = time()
    print('multi_iter%d: %s %.2f' % (p, str(res2), stop2 - start2))

Время снова ужасное, но вы можете видеть, как использование буфера постоянного размера значительно улучшает ситуацию:

5 iterators: (0, 9999999, 49999995000000, 0, 999) 0.71
multi_iter0: (0, 9999999, 49999995000000, 0, 999) 342.36
multi_iter1: (0, 9999999, 49999995000000, 0, 999) 264.71
multi_iter2: (0, 9999999, 49999995000000, 0, 999) 151.06
multi_iter3: (0, 9999999, 49999995000000, 0, 999) 95.79
multi_iter4: (0, 9999999, 49999995000000, 0, 999) 72.79

Возможно, это может послужить источником идей для хорошей реализации.

0 голосов
/ 08 апреля 2020

Вот реализация решения с вытесняющим потоком, изложенного в исходном вопросе.

[РЕДАКТИРОВАТЬ: Существует серьезная проблема с этой реализацией. [РЕДАКТИРОВАТЬ, теперь исправлено, используя решение, вдохновленное Дэниелом Юнгласом.]

Потребители, которые не выполняют итерацию через целом , приведут к утечке пространства в очереди внутри Link. Например:


def exceeds_10(iterable):
    for item in iterable:
        if item > 10:
            return True
    return False

, если вы используете это как один из потребителей и используете источник range(10**6), он прекратит удалять элементы из очереди внутри Link после первых 11 элементов, оставляя примерно 10**6 пунктов, которые будут накапливаться в очереди!

]


class Link:

    def __init__(self, queue):
        self.queue = queue

    def __iter__(self):
        return self

    def __next__(self):
        item = self.queue.get()
        if item is FINISHED:
            raise StopIteration
        return item

    def put(self, item):
        self.queue.put(item)

    def stop(self):
        self.queue.put(FINISHED)

    def consumer_not_listening_any_more(self):
        self.__class__ = ClosedLink


class ClosedLink:

    def put(self, _): pass
    def stop(self)  : pass


class FINISHED: pass


def make_thread(link, consumer, future):
    from threading import Thread
    return Thread(target = lambda: on_thread(link, consumer, future))

def on_thread(link, consumer, future):
    future.set_result(consumer(link))
    link.consumer_not_listening_any_more()

def source_summaries_PREEMPTIVE_THREAD(source, *consumers):
    from queue     import SimpleQueue as Queue
    from asyncio   import Future

    links   = tuple(Link(Queue()) for _ in consumers)
    futures = tuple(     Future() for _ in consumers)
    threads = tuple(map(make_thread, links, consumers, futures))

    for thread in threads:
        thread.start()

    for item in source:
        for link in links:
            link.put(item)

    for link in links:
        link.stop()

    for t in threads:
        t.join()

    return tuple(f.result() for f in futures)

Работает, но (что неудивительно) с ужасным ухудшением производительности:

def time(thunk):
    from time import time
    start = time()
    thunk()
    stop  = time()
    return stop - start

N = 10 ** 7
t = time(lambda: testit(source_summaries, N))
print(f'old: {N} in {t:5.1f} s')

t = time(lambda: testit(source_summaries_PREEMPTIVE_THREAD, N))
print(f'new: {N} in {t:5.1f} s')

дача

old: 10000000 in   1.2 s
new: 10000000 in  30.1 s

Итак, хотя это теоретическое решение, оно не практическое [*].

Следовательно, я думаю, что этот подход - тупик, если только нет способ убедить consumer совместно уступить (в отличие от принуждения к уступке преимущественно) в

def on_thread(link, consumer, future):
    future.set_result(consumer(link))

... но это кажется принципиально невозможным. Хотелось бы оказаться неправым.

[*] Это на самом деле немного сложнее sh: тест абсолютно ничего не делает с тривиальными данными; если бы это было частью более крупного вычисления, которое выполняло тяжелые вычисления для элементов, то этот подход мог бы быть действительно полезным.

...