Выполнить метод в каждом объекте в списке параллельно - PullRequest
0 голосов
/ 27 ноября 2018

У меня есть список объектов, и я хочу выполнить метод в каждом объекте параллельно.Метод изменяет атрибуты объектов.Например:

class Object:
    def __init__(self, a):
        self.a = a
    def aplus(self):
        self.a += 1

object_list = [Object(1), Object(2), Object(3)]

# I want to execute this in parallel
for i in range(len(object_list)):
    object_list[i].aplus() 

Я попробовал следующее:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

executor = ProcessPoolExecutor(max_workers=3)
res = executor.map([obj.aplus for obj in object_list])

Что не работает, оставляя объекты без изменений.Я предполагаю, что это потому, что объекты могут быть скопированы, а не доступны, с помощью многопроцессорной обработки.Любая идея?

Большое спасибо!

РЕДАКТИРОВАТЬ: предположительно, объекты очень большие, поэтому было бы предпочтительнее избегать копирования их в каждый процесс.Методы также предположительно сильно нагружают процессор, поэтому следует использовать несколько процессов, а не потоков.В этих условиях я считаю, что решения не существует, поскольку многопроцессорная обработка не может совместно использовать память, а потоки не могут использовать несколько процессоров.Я хотел бы, чтобы меня показывали неправильно.

Ответы [ 3 ]

0 голосов
/ 27 ноября 2018

Вот мой ответ, используя threading:

from threading import Thread

class Object:
    def __init__(self, a):
        self.a = a
    def aplus(self):
        self.a += 1

object_list = [Object(1), Object(2), Object(3)]

# A list containing all threads we will create
threads = []

# Create a thread for every objects
for obj in object_list:
    thread = Thread(target=obj.aplus)
    thread.daemon = True
    thread.start()
    threads.append(thread)

# Wait for all threads to finish before continuing
for thread in threads:
    thread.join();

# prints results
for obj in object_list:
    print(obj.a)
0 голосов
/ 27 ноября 2018

Я предполагаю, что это потому, что объекты могут быть скопированы и не доступны только при многопроцессорной обработке.

Это совершенно верно, и это половина ответа.Поскольку процессы изолированы, каждый из них имеет свою собственную копию object_list.Одним из решений здесь является использование ThreadPoolExecutor (все потоки имеют одинаковый object_list).

Синтаксис для его использования немного отличается от того, что вы пытаетесь использовать, но это работает как задумано:

executor = ThreadPoolExecutor(max_workers=3)
res = executor.map(Object.aplus, object_list)

Если вы действительно хотите использовать ProcessPoolExecutor, вам нужно каким-то образом получить данные от процессов.Самый простой способ - использовать функции, которые возвращают значения:

from concurrent.futures import ProcessPoolExecutor


class Object:
    def __init__(self, a):
        self.a = a

    def aplus(self):
        self.a += 1
        return self.a


if __name__ == '__main__':

    object_list = [Object(1), Object(2), Object(3)]

    executor = ProcessPoolExecutor(max_workers=3)
    for result in executor.map(Object.aplus, object_list):
        print("I got: " + str(result))

Вы даже можете иметь функцию, которую вы map возвращаете self, и затем помещать эти возвращенные объекты обратно в object_listконец.Таким образом, полное многопроцессорное решение будет выглядеть так:

from concurrent.futures import ProcessPoolExecutor


class Object:
    def __init__(self, a):
        self.a = a

    def aplus(self):
        self.a += 1
        return self


if __name__ == '__main__':

    object_list = [Object(1), Object(2), Object(3)]

    executor = ProcessPoolExecutor(max_workers=3)
    object_list = list(executor.map(Object.aplus, object_list))
0 голосов
/ 27 ноября 2018

Вот рабочий пример использования Pool.map:

import multiprocessing

class Object:
    def __init__(self, a):
        self.a = a

    def aplus(self):
        self.a += 1

    def __str__(self):
        return str(self.a)

def worker(obj):
    obj.aplus()
    return obj

if __name__ == "__main__":
    object_list = [Object(1), Object(2), Object(3)]

    try:
        processes = multiprocessing.cpu_count()
    except NotImplementedError:
        processes = 2

    pool = multiprocessing.Pool(processes=processes)
    modified_object_list = pool.map(worker, object_list)

    for obj in modified_object_list:
        print(obj)

Отпечатки:

2
3
4
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...