Python многопроцессорная обработка с большими объектами: предотвращение копирования / сериализации объекта - PullRequest
2 голосов
/ 14 января 2020

Я реализовал многопроцессорную обработку для некоторых проблем с более крупными объектами, такими как:

import time
import pathos.multiprocessing as mp
from functools import partial
from random import randrange


class RandomNumber():
    def __init__(self, object_size=100):
        self.size = bytearray(object_size*10**6)  # 100 MB size
        self.foo = None

    def do_something(self, *args, **kwargs):
        self.foo = randrange(1, 10)
        time.sleep(0.5)  # wait for 0.5 seconds
        return self


def wrapper(random_number, *args, **kwargs):
    return random_number.do_something(*args, **kwargs)


if __name__ == '__main__':
    # create data
    numbers = [RandomNumber() for m in range(0, 9)]
    kwds = {'add': randrange(1, 10)}

    # calculate
    pool = mp.Pool(processes=mp.cpu_count())
    result = pool.map_async(partial(wrapper, **kwds), numbers)
    try:
        result = result.get()
    except:
        pass

    # print result
    my_results = [i.foo for i in result]
    print(my_results)

    pool.close()
    pool.join()

, что приводит к чему-то вроде:

[8, 7, 8, 3, 1, 2, 6, 4, 8]

Теперь проблема в том, что у меня значительное улучшение в производительности по сравнению с использованием понимания списка, когда объекты очень маленькие, и это улучшение превращается в противоположность с более крупными объектами размером, например, 100 МБ и более.

Из документации и других вопросов I обнаружили, что это вызвано использованием рассола / укропа для сериализации отдельных объектов с целью передачи их работникам в пуле. Другими словами: объекты копируются, и эта операция ввода-вывода становится узким местом, поскольку она требует больше времени, чем фактические вычисления.

Я уже пытался работать с тем же объектом, используя многопроцессорную обработку . Менеджер но это привело к еще большему времени выполнения.

Проблема в том, что я привязан к конкретной c структуре класса (здесь представлена ​​через RandomNumber()), которую я не могу изменить ..

Теперь мой вопрос: есть ли способы или концепции, чтобы обойти это поведение и получать мои звонки только на do_something() без затрат на сериализацию или копирование?

Любые подсказки приветствуются. Заранее спасибо!

Ответы [ 2 ]

0 голосов
/ 14 января 2020

Я нашел решение, использующее многопроцессорность или многопоточность из библиотеки concurrent.futures , которая не требует выбора объектов. В моем случае многопоточность с использованием ThreadPoolExecutor дает явное преимущество по сравнению с многопроцессорной обработкой с помощью ProcessPoolExecutor.

import time
from random import randrange
import concurrent.futures as cf


class RandomNumber():
    def __init__(self, object_size=100):
        self.size = bytearray(object_size*10**6)  # 100 MB size
        self.foo = None

    def do_something(self, *args, **kwargs):
        self.foo = randrange(1, 10)
        time.sleep(0.5)  # wait for 0.5 seconds
        return self


def wrapper(random_number, *args, **kwargs):
    return random_number.do_something(*args, **kwargs)


if __name__ == '__main__':
    # create data
    numbers = [RandomNumber() for m in range(0, 100)]
    kwds = {'add': randrange(1, 10)}

    # run
    with cf.ThreadPoolExecutor(max_workers=3) as executor:
        result = executor.map(wrapper, numbers, timeout=5*60)

    # print result
    my_results = [i.foo for i in result]
    print(my_results)

, что дает:

[3, 3, 1, 1, 3, 7, 7, 6, 7, 5, 9, 5, 6, 5, 6, 9, 1, 5, 1, 7, 5, 3, 6, 2, 9, 2, 1, 2, 5, 1, 7, 9, 2, 9, 4, 9, 8, 5, 2, 1, 7, 8, 5, 1, 4, 5, 8, 2, 2, 5, 3, 6, 3, 2, 5, 3, 1, 9, 6, 7, 2, 4, 1, 5, 4, 4, 4, 9, 3, 1, 5, 6, 6, 8, 4, 4, 8, 7, 5, 9, 7, 8, 6, 2, 3, 1, 7, 2, 4, 8, 3, 6, 4, 1, 7, 7, 3, 4, 1, 2]

real    0m21.100s
user    0m1.100s
sys 0m2.896s

Тем не менее, это все же приводит к утечке памяти в случаях где у меня слишком много объектов (здесь numbers), и это не мешает переходить в какой-то «пакетный режим», если нужно поменять память, т. е. система зависает до завершения задачи.

Любые подсказки как это предотвратить?

0 голосов
/ 14 января 2020

Вам нужно использовать Batch processing. Не создавать уничтожать рабочих для каждого числа. Сделайте ограниченных рабочих на основе cpu_count. Затем передайте список каждому обработанному и обработайте их. Используйте map и передайте список, содержащий batches чисел.

...