Можно ли сделать многопроцессорную работу внутри цикла в Python 3.2 - PullRequest
4 голосов
/ 27 августа 2011

Я пытаюсь использовать python (3.2) для многопроцессорности (ubuntu), чтобы решить большую проблему поиска.По сути, я хочу взять список, вынуть первый элемент, найти все остальные элементы, которые имеют те же свойства, что и объект, объединить найденные элементы и целевой элемент в один список, удалить их из исходного списка и (цикл)сделай все это снова.Многопроцессорность предназначена для разделения работы между процессорами.Код выполняется один раз без проблем.Фактически, он также будет зацикливаться, так как исключение игнорируется и, похоже, работает хорошо.Но в течение 30 секунд он израсходовал почти все мои 16 ГБ оперативной памяти.

На данный момент у меня есть две проблемы: 1) я получаю «Exception AssertionError: AssertionError (« могу только проверить дочерний процесс »,как только я зациклюсь (и получу их много).Наряду с этим большое количество использования ОЗУ (которое, я думаю, может быть связано, не уверен).И 2) Кажется, он даже не выполняет поиск параллельно, когда я использую больший набор данных.

Мой код выглядит так:

class triangleListWorker(multiprocessing.Process):
    def __init__(self, work_queue, target, results,start):
        super().__init__()
        self.work_queue = work_queue
        self.results = results
        self.target = target
        self.startIndex = start
    def run(self):
        while True:
            try:
                searching = self.work_queue.get()
                self.do_search(searching)

            finally:
                self.work_queue.task_done()

    def do_search(self,searching):
        for x in range(len(searching)):
            if self.target.same_plane(searching[x]):
                self.results.append(self.startIndex+x)

Что я пытаюсьДля этого используется Manager (). list () для хранения всех индексов, в которых целевой объект и искомый объект существуют в одной плоскости.

    def do_multi_find_connections(self, target,searchList):
        work_queue = multiprocessing.JoinableQueue()
        #results= multiprocessing.Queue()

        cpu_count = multiprocessing.cpu_count()
        results = multiprocessing.Manager().list()
        range_per_process = len(searchList) // cpu_count
        start,end = 0, range_per_process + (len(searchList) % cpu_count)
        for i in range(cpu_count):
            worker = triangleListWorker(work_queue,target,results,start)
            worker.daemon = True
            worker.start()
        for x in range(cpu_count):
            searchsub = [searchList[x] for x in range(start,end)]
            work_queue.put(searchList[start:end])
            #work_queue.put(searchList[start:end])
            start,end = end, end + range_per_process
            print(start,end)

        work_queue.join()
        print( "can continue...")

        return results

    def find_connections(self, triangle_list,doMultiProcessing):
        tlist = [x for x in triangle_list]
        print("len tlist", len(tlist))
        results = []
        self.byPlane = []
        if doMultiProcessing:
            while len(tlist) > 0:
                results = []
                target = tlist[0]
                #print("target",tcopy[0])
                self.do_multi_find_connections(target,tlist)

                results = self.do_multi_find_connections(target,tlist)#list of indexes
                plane = []

                print(len(results))
                print(results)
                for x in results:
                    plane.append(tlist[x])
                new_tlist = [tlist[x] for x in range(len(tlist)) if not x in results]
                print(len(new_tlist))
                tlist = new_tlist

                self.byPlane.append(plane)

##                self.byPlane.append(plane)
##                tlist = []

Предполагается, что этот код (возможно, немного уродливый)выполнить цикл, чтобы найти следующую плоскость, и исчерпать все остальное, что находится в плоскости, вызвав функцию над ней (которая выполняет многопроцессорную обработку).

Работает в Ubuntu 11.04 64, python 3.2.

Ответы [ 2 ]

1 голос
/ 27 августа 2011

Вместо использования цикла я думаю, что предполагаемый шаблон для модуля multiprocessing - создать Pool и использовать метод Pool.map_async. IOW, конвертируйте ваш цикл в какой-то итератор (вероятно, метод generator ). Затем передайте эквивалент вашего do_search метода в качестве функции и вашего итератора в map_async.

0 голосов
/ 21 февраля 2013

Вы можете использовать класс Pool в многопроцессорной среде:

from multiprocessing import Pool
pool = Pool(processes=5)
valuesProcessed = pool.map(someFunction, valuesToProcess)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...