Динамические процессы в Python - PullRequest
4 голосов
/ 12 апреля 2009

У меня есть вопрос, касающийся многопроцессорной обработки Python. Я пытаюсь взять набор данных, разбить его на куски и передать эти куски одновременно выполняющимся процессам. Мне нужно преобразовать большие таблицы данных, используя простые вычисления (например, электрическое сопротивление -> температура для термистора).

Код, приведенный ниже, работает почти так, как нужно, но, похоже, он не порождает каких-либо новых процессов (или, если да, только один за раз). Я новичок в Python, поэтому, вероятно, есть довольно простое решение этой проблемы.

Заранее спасибо!

from multiprocessing import Process

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = map(self.process, self.data)
        super(Worker, self).__init__()

if __name__ == '__main__':
    start = datetime.datetime.now()
    dataset = range(10000) # null dataset
    processes = 3

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))

        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        tmp = dataset[i * chunk : (i + 1) * chunk + remainder]
        exec('worker'+str(i)+' = Worker(tmp)')
        exec('worker'+str(i)+'.start()')

    for i in range(processes):
        exec('worker'+str(i)+'.join()')
        # just a placeholder to make sure the initial values of the set are as expected
        exec('print worker'+str(i)+'.result[0]')

Ответы [ 3 ]

1 голос
/ 13 апреля 2009

Нет необходимости отправлять количество блоков каждому процессу, просто используйте get_nowait () и обработайте возможное исключение Queue.Empty. Каждый процесс будет получать разное количество процессорного времени, и это должно держать их всех занятыми.

import multiprocessing, Queue

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output):
        self.input = input
        self.output = output
        super(Worker, self).__init__()

    def run(self):
        try:
            while True:
                self.output.put(self.process(self.input.get_nowait()))
        except Queue.Empty:
            pass


if name == 'main':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)
    for i in range(processes):
        Worker(input, output).start()

    for i in range(len(dataset)):
        print output.get()
1 голос
/ 12 апреля 2009

Вы не переопределили метод run. Есть два способа, с помощью которых процессы (или потоки) могут выполнять код:

  1. Создать процесс с указанием цели
  2. Подкласс процесса, переопределяющий метод run.

Переопределение __init__ просто означает, что все ваши процессы одеты и некуда идти. Он должен использоваться для придания ему атрибутов, необходимых для выполнения того, что ему нужно, но не должен указывать задачу, которую нужно выполнить.

В вашем коде все тяжелые операции выполняются в этой строке:

exec('worker'+str(i)+' = Worker(tmp)')

и здесь ничего не делается:

exec('worker'+str(i)+'.start()')

Так что проверка результатов с помощью exec('print worker'+str(i)+'.result[0]') должна дать вам что-то значимое, но только потому, что код, который вы хотите выполнить , был выполнен , но при построении процесса, а не при запуске процесса.

Попробуйте это:

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = []
        super(Worker, self).__init__()

    def run(self):
        self.result = map(self.process, self.data)

EDIT:

Хорошо ... так что я просто летал, основываясь на своих инстинктах, и все они были не правы. Что мы оба не поняли о процессах, так это то, что вы не можете напрямую делиться переменными. Все, что вы передаете новому процессу, читается, копируется и исчезает навсегда. Если вы не используете один из двух стандартных способов обмена данными: очереди и каналы . Я немного поиграл, пытаясь заставить твой код работать, но пока не повезло. Я думаю, что это поставит вас на правильный путь.

0 голосов
/ 13 апреля 2009

Хорошо, похоже, список не был потокобезопасным, и я перешел к использованию очереди (хотя, похоже, она намного медленнее). Этот код в основном выполняет то, что я пытался сделать:

import math, multiprocessing

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output, chunksize):
        self.input = input
        self.output = output
        self.chunksize = chunksize
        super(Worker, self).__init__()

    def run(self):
        for x in range(self.chunksize):
            self.output.put(self.process(self.input.get()))


if __name__ == '__main__':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))
        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        Worker(input, output, chunk + remainder).start()

    for i in range(len(dataset)):
        print output.get()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...