потоки Python и очереди для бесконечного ввода данных (поток) - PullRequest
1 голос
/ 06 июля 2010

Я хотел бы использовать поток для обработки потокового ввода.

Как создать приведенный ниже код для бесконечного ввода, например, с помощью itertools.count

Код ниже будет работать, если: 'для i в itertools.count ():' заменяется на 'для i в xrange (5):'

from threading import Thread
from Queue import Queue, Empty
import itertools

def do_work(q):
  while True:
    try:
        x = q.get(block=False)
        print (x)
    except Empty:
        break

if __name__ == "__main__":
  work_queue = Queue()
  for i in itertools.count():
    work_queue.put(i)

  threads = [Thread(target=do_work, args=(work_queue,)) for i in range(8)]

  for t in threads: t.start()
  for t in threads: t.join()

Ответы [ 3 ]

2 голосов
/ 06 июля 2010

Проблема в том, что itertools.count генерирует бесконечную последовательность. Это означает, что цикл for никогда не закончится. Вы должны поместить это в свою собственную функцию и создать отдельный поток. Таким образом, у вас будет увеличиваться очередь, пока рабочие потоки получат данные из очереди.

1 голос
/ 06 июля 2010

Вам нужно заполнить очередь потоком.Вам нужно управлять размером очереди.Особенно, если рабочие тратят время на обработку предметов.Вам нужно отметить выполненные пункты очереди.Если это связано с вашим другим вопросом о твиттере и «чрезвычайно быстром» вводе, то у вас есть намного больше общего с вставками базы данных.

Ваши вопросы были слишком расплывчаты по довольно сложным темам.Кажется, вы недостаточно понимаете даже то, чего пытаетесь достичь, чтобы понять, что это нелегко.Я рекомендую вам немного более точно определить, что вы пытаетесь сделать.

Вот пример заполнения и использования очереди потоками.Размер очереди не управляется.

from threading import Thread
from Queue import Queue, Empty, Full
import itertools
from time import sleep


def do_work(q,wkr):
  while True:
    try:
      x = q.get(block=True,timeout=10)
      q.task_done()
      print "Wkr %s: Consuming %s" % (wkr,x)
      sleep(0.01)
    except Empty:
      print "Wkr %s exiting, timeout/empty" % (wkr)
      break
    sleep(0.01)

def fill_queue(q,limit=1000):
  count = itertools.count()
  while True:
    n = count.next()
    try:
      q.put(n,block=True,timeout=10)
    except Full:
      print "Filler exiting, timeout/full"
      break
    if n >= limit:
      print "Filler exiting, reached limit - %s" % limit
      break
    sleep(0.01)

work_queue = Queue()

threads = [Thread(target=do_work, args=(work_queue,i)) for i in range(2)]
threads.insert(0,Thread(target=fill_queue,args=(work_queue,100)))

for t in threads:
  t.start()

for t in threads:
  t.join()

 Wkr 0: Consuming 0
 Wkr 1: Consuming 1
 Wkr 0: Consuming 2
 Wkr 1: Consuming 3
 ....
 Wkr 1: Consuming 99
 Filler exiting, reached limit - 100
 Wkr 0: Consuming 100
 Wkr 1 exiting, timeout/empty
 Wkr 0 exiting, timeout/empty
1 голос
/ 06 июля 2010

Может быть, я что-то упускаю, но разве это не так просто, как создавать и запускать потоки до цикла for?

Кроме того, прерывание ваших потоков при отсутствии работы кажется плохимидея, поскольку в будущем может появиться больше работы.Разумеется, вы хотите, чтобы они блокировались, пока не появятся какие-либо работы?

...