Моя очередь HelloWorld работает? - PullRequest
2 голосов
/ 24 августа 2011

Я собираюсь использовать этот дизайн в приложении, но я довольно плохо знаком с многопоточностью и очередями в Python.Очевидно, что реальное приложение не для того, чтобы сказать привет, но дизайн тот же - то есть есть процесс, который требует некоторого времени на настройку и демонтаж, но я могу выполнить несколько задач одним ударом.Задачи будут приходить в случайные моменты времени и часто в виде очередей.

Это разумный и безопасный для ниток дизайн?

class HelloThing(object):

  def __init__(self):
    self.queue = self._create_worker()

  def _create_worker(self):
    import threading, Queue

    def worker():
      while True:
        things = [q.get()]
        while True:
          try:
            things.append(q.get_nowait())
          except Queue.Empty:
            break
        self._say_hello(things)
        [q.task_done() for task in xrange(len(things))]

    q = Queue.Queue()
    n_worker_threads = 1
    for i in xrange(n_worker_threads):
      t = threading.Thread(target=worker)
      t.daemon = True
      t.start()

    return q

  def _say_hello(self, greeting_list):
    import time, sys
    # setup stuff
    time.sleep(1)
    # do some things
    sys.stdout.write('hello {0}!\n'.format(', '.join(greeting_list)))
    # tear down stuff
    time.sleep(1)


if __name__ == '__main__':
  print 'enter __main__'

  import time
  hello = HelloThing()

  hello.queue.put('world')
  hello.queue.put('cruel world')
  hello.queue.put('stack overflow')

  time.sleep(2)

  hello.queue.put('a')
  hello.queue.put('b')

  time.sleep(2)

  for i in xrange(20):
    hello.queue.put(str(i))

  #hello.queue.join()

  print 'finish __main__'

1 Ответ

0 голосов
/ 25 августа 2011
  1. Безопасность потока обрабатывается реализацией очереди (также вы должны обращаться с ней в реализации _say_hello, если это требуется).

  2. Проблема обработчика пакета: Пакет должен обрабатываться только одним потоком (например, скажем, настройка / разрыв вашего процесса занимает 10 секунд; в секунду 1 все потоки будут заняты пакетом с секунды 0, вкл. Второй 5 новая задача (или пакет), но нет потока, чтобы обработать их / это). Таким образом, пакет должен быть определен максимальным количеством задач (или может быть «бесконечным») для определенного временного окна. Запись в очереди должна быть списком задач.

Как вы можете группировать список задач? Я предоставляю решение в виде кода, более простого для объяснения ...

producer_q = Queue()
def _burst_thread():
   while True:
      available_tasks = [producer_q.get()]
      time.sleep(BURST_TIME_WINDOW)
      available_tasks.extend(producer_q.get() # I'm the single consumer, so will be at least qsize elements  
                             for i in range(producer_q.qsize()))
      consumer_q.push(available_tasks)

Если вы хотите, чтобы в пакете было максимум сообщений, вам просто нужно разделить available_tasks в нескольких списках.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...