проблема производителя / потребителя с многопроцессорностью Python - PullRequest
14 голосов
/ 27 мая 2009

Я пишу серверную программу с одним производителем и несколькими потребителями, меня смущает только то, что в очередь попадает только первый производитель задач потребляется, после чего поставленные в очередь задачи больше не расходуются, они остаются в очереди навсегда.

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(5)
        print "task done:", task
    queue.put(None)

class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.workers = [Process(target=work, args=(self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        httpserv(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        queue.close()

Manager().start()

Производитель - HTTP-сервер, который ставит задачу в очередь после получения. запрос от пользователя. Кажется, что потребительские процессы все еще блокируется при появлении новых задач в очереди, что странно.

P.S. Еще два вопроса, не относящиеся к вышесказанному, я не уверен, если лучше поместить HTTP-сервер в отдельный процесс, чем основной процесс, если да, как я могу заставить основной процесс продолжать работать до того, как все дочерние процессы заканчиваются. Второй вопрос, каков наилучший способ остановить HTTP-сервер изящно?

Редактировать : добавить код производителя, это просто простой сервер Python wsgi:

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
    evwsgi.start("0.0.0.0", 8080)
    evwsgi.set_base_module(base)

    def request_1(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_1')
        return ["request 1!"]

    def request_2(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_2')
        return ["request 2!!"]

    evwsgi.wsgi_cb(("/request_1", request_1))
    evwsgi.wsgi_cb(("/request_2", request_2))

    evwsgi.run()

Ответы [ 3 ]

10 голосов
/ 27 мая 2009

Я думаю, что должно быть что-то не так с частью веб-сервера, поскольку это прекрасно работает:

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
    works = ["task_1", "task_2"]
    while True:
        time.sleep(0.01)
        queue.put(random.choice(works))


def work(id, queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(0.05)
        print "%d task:" % id, task
    queue.put(None)


class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        print "starting %d workers" % self.NUMBER_OF_PROCESSES
        self.workers = [Process(target=work, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        serve(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESS):
            self.workers[i].join()
        self.queue.close()


Manager().start()

Пример вывода:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1
4 голосов
/ 27 мая 2009

«Второй вопрос, как лучше всего изящно остановить HTTP-сервер?»

Это сложно.

У вас есть два варианта межпроцессного взаимодействия:

  • Внеполосное управление. На сервере есть еще один механизм общения. Другой сокет, сигнал Unix или что-то еще. Что-то еще может быть файлом «stop-now» в локальном каталоге сервера. Кажется странным, но это работает хорошо и проще, чем ввод цикла выбора для прослушивания нескольких сокетов или обработчик сигнала для перехвата сигнала Unis.

    Файл "stop-now" прост в реализации. Цикл evwsgi.run() просто проверяет этот файл после каждого запроса. Чтобы сервер остановился, вы создаете файл, выполняете запрос /control (который получит ошибку 500 или что-то, на самом деле это не имеет значения), и сервер должен остановиться. Не забудьте удалить файл stop-now, иначе ваш сервер не будет перезагружен.

  • Внутриполосное управление. На сервере есть другой URL (/stop), который остановит его. Внешне это похоже на кошмар безопасности, но полностью зависит от того, где и как будет использоваться этот сервер. Поскольку это выглядит как простая оболочка для внутренней очереди запросов, этот дополнительный URL-адрес работает хорошо.

    Чтобы это сработало, вам нужно написать собственную версию evwsgi.run(), которую можно прекратить, установив некоторую переменную таким образом, чтобы она вышла из цикла.

Редактировать

Вы, вероятно, не хотите завершать работу своего сервера, поскольку не знаете состояния его рабочих потоков. Вам нужно дать сигнал серверу, а затем просто подождать, пока он не закончит работу нормально.

Если вы хотите принудительно уничтожить сервер, то os.kill() (или multiprocessing.terminate) будет работать. За исключением, конечно, вы не знаете, что делали дочерние потоки.

1 голос
/ 27 февраля 2011
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...