Несколько клиентов для генератора Python? - PullRequest
1 голос
/ 19 марта 2012

Как продолжение до этого вопроса Я пытаюсь обойти построение списка, примером которого является range(int(1e8)), с помощью генератора xrange(int(1e8)). Где xrange - это просто пример для процесса, который выдает длинную последовательность значений. (Пожалуйста, предположите, что это не может быть легко воспроизведено.) Еще один фон, у меня есть длинный список пар меток времени / значений, над которыми я хочу провести некоторую обработку (вроде временных рядов). Я стараюсь не вытягивать их в память в целом, потому что это много данных.

Я подумал, что было бы здорово, если бы я мог применить несколько блоков обработки одновременно к этому потоку данных, генерируемых моим генератором. Первой идеей было использование itertools.tee(), например ::

from itertools import tee
g1,g2 = tee(xrange(int(1e8)),2)
sum(g1), sum(g2)

Но потом я обнаружил, что только первый sum() будет использовать генератор, в то время как tee() снова создаст list (чего я хотел избежать.)

Поэтому я подумал, что мне нужно асинхронное решение, то есть такое, которое позволило бы каждому sum() обновлять каждый шаг генератора. Вещи, которые пришли в голову, где

Но я никогда не использовал их раньше, и отчасти я даже не могу сказать, могут ли подходы работать или быть эффективными / действенными / эффективными.

С этого момента я с удовольствием буду рад любым предложениям из зала!


Обновление

Я хотел бы избежать решения на основе обратного вызова , так как оно значительно снижает производительность (именно так оно и реализуется в настоящее время). Я добавил некоторые профилирования ниже (пожалуйста, добавьте комментарии, если тест не является объективным):

class SinkA:
  def __init__(self, src):
    for i in src: pass

class SinkB:
  def f(self,i):
    pass

class Source:
  def __iter__(self):
    for i in xrange(int(1e4)):
      yield i

def t1():
  src = Source()
  snk = SinkA(src)

def t2():
  src = Source()
  snk = SinkB()
  for i in src: snk.f(i)

if __name__ == "__main__":
    from timeit import Timer
    n = 1000
    t = Timer("t1()", "from __main__ import t1, t2, SinkA, SinkB, Source")
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 612.11 usec/pass
    t = Timer("t2()", "from __main__ import t1, t2, SinkA, SinkB, Source")
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 1933.39 usec/pass

Обновление 2

Что еще я могу сказать? У меня есть это решение на основе обратного вызова, которое кажется неэффективным. Подход на основе генератора кажется многообещающим, но у меня слишком мало опыта в такого рода программировании, особенно когда речь идет о более сложных вещах, таких как сопрограммы или искаженная библиотека. Подводя итог, у меня есть несколько потребителей для процесса, который генерирует много данных, и я определил некоторые потенциальные подходы. Сейчас я ищу квалифицированные заявления опытных пользователей, которые, вероятно, уже выполняли аналогичные задачи. Заявления о том, какой подход может быть подходящим, как эти подходы связаны друг с другом. Или какие другие подходы я мог бы пропустить в конце концов.

Ответы [ 4 ]

6 голосов
/ 19 марта 2012

В качестве общего подхода я бы заменил модель вытягивания генератора на обратные вызовы и, возможно, обернул бы генератор, например так:

def walk(gen, callbacks):
    for item in gen:
        for f in callbacks:
            f(item)

Если ваши процессоры находятся в отдельных потоках и вы хотите, чтобыБлокируя ожидание, вы можете зарегистрировать Queue.put (или что-нибудь эквивалентное) в качестве обратного вызова для каждого процессора и независимо опросить эти очереди.Это позволит вам использовать как модели вещания, так и модели рабочих пулов, если вам это необходимо.

Редактировать

Другим решением будет использование сопрограмм :

def source(self, *dests):
    for i in xrange(int(1e4)):
        for dest in dests:
            dest.send(i)

def sink():
    while True:
        i = yield

def t3():
    snk = sink()
    snk.next() # activate the coroutine
    source(snk)

if __name__ == '__main__':

    from timeit import Timer
    n = 1000
    t = Timer("t3()", "from __main__ import source, sink, t3")
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 872.99 usec/pass

Выглядит достаточно быстро.По сути, сопрограммы - это инвертированные генераторы, вы тянете от генератора, толкаете к сопрограмме.

1 голос
/ 19 марта 2012

Поскольку генераторы дешевы в памяти, почему бы вам просто не использовать два независимых генератора?

g1 = xrange(int(1e8))
g2 = xrange(int(1e8))
sum(g1), sum(g2)
1 голос
/ 19 марта 2012

Вы на самом деле не обращаетесь к этому, но хотите ли вы, чтобы каждый потребитель видел одни и те же данные (в этом случае tee, вероятно, является лучшим решением), или нет?

Если нет, то вы можете просто прочитать каждого потребителя из одного объекта-генератора.

Если вы хотите, чтобы они получали точно такие же данные, попробуйте tee (использует больше памяти)против двух генераторов (больше ввода-вывода), и посмотрите, что быстрее.

Что касается ваших временных параметров, то ваши данные показывают лишь то, что есть издержки для нескольких вызовов функций и что один из ваших методов избегает промежуточной функциизвонки.

Если вы хотите улучшить производительность, попробуйте запустить это на PyPy, который имеет JIT, оптимизирующий точку доступа.

0 голосов
/ 19 марта 2012

Я предлагаю вам посмотреть, как реализовать это с сопрограммами , а точнее с этим примером вещания

...