Атрибуты экземпляра не сохраняются при использовании многопроцессорной обработки - PullRequest
1 голос
/ 19 марта 2019

У меня проблема с экземплярами, не сохраняющими изменения атрибутов или даже сохраняющими новые созданные атрибуты. Я думаю, что я сузил это до факта, что мой сценарий использует преимущества многопроцессорной обработки, и я думаю, что изменения, происходящие с экземплярами в отдельных потоках процесса, не «запоминаются», когда сценарий возвращается в основной поток.

По сути, у меня есть несколько наборов данных, которые мне нужно обрабатывать параллельно. Данные хранятся в виде атрибута и изменяются с помощью нескольких методов в классе. По завершении обработки я надеюсь вернуться в основной поток и объединить данные из каждого экземпляра объекта. Однако, как описано выше, когда я пытаюсь получить доступ к атрибуту экземпляра с данными после выполнения бита параллельной обработки, там ничего не происходит. Как будто любые изменения, внесенные во время многопроцессорной обработки, «забыты».

Есть ли очевидное решение, чтобы это исправить? Или мне нужно пересобрать мой код, чтобы вместо этого возвращать обработанные данные, а не просто изменять / сохранять их как атрибут экземпляра? Я предполагаю, что альтернативным решением было бы сериализовать данные, а затем перечитать их в случае необходимости, а не просто сохранить их в памяти.

Здесь, возможно, стоит отметить, что я использую модуль pathos, а не модуль multiprocessing python. Я получаю некоторые ошибки, относящиеся к засолке, похожие на следующие: Многопроцессорная обработка Python PicklingError: Can't pickle . Мой код разбит на несколько модулей, и, как уже упоминалось, методы обработки данных содержатся в классе.

Извините за стену текста.

EDIT Вот мой код:

import importlib
import pandas as pd
from pathos.helpers import mp
from provider import Provider

# list of data providers ... length is arbitrary
operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']


# create provider objects for each operating provider
provider_obj_list = []
for name in operating_providers:
    loc     = 'providers.%s' % name
    module  = importlib.import_module(loc)
    provider_obj = Provider(module)
    provider_obj_list.append(provider_obj)

processes = []
for instance in provider_obj_list:
    process = mp.Process(target = instance.data_processing_func)
    process.daemon = True
    process.start()
    processes.append(process)

for process in processes:
    process.join()

# now that data_processing_func is complete for each set of data, 
# stack all the data
stack = pd.concat((instance.data for instance in provider_obj_list))

У меня есть несколько модулей (их имена перечислены в operating_providers), которые содержат атрибуты, специфичные для их источника данных. Эти модули итеративно импортируются и передаются новым экземплярам класса Provider, который я создал в отдельном модуле (provider). Я добавляю каждый экземпляр Provider в список (provider_obj_list), а затем итеративно создаю отдельные процессы, которые вызывают метод экземпляра instance.data_processing_func. Эта функция выполняет некоторую обработку данных (с каждым экземпляром, обращающимся к совершенно разным файлам данных) и создает новые атрибуты экземпляра по пути, к которым мне нужно получить доступ после завершения параллельной обработки.

Я попытался использовать вместо этого многопоточность, а не многопроцессорность - в этом случае атрибуты моего экземпляра сохранились, чего я и хочу. Однако я не уверен, почему это происходит - мне придется изучить различия между многопоточностью и многопроцессорностью.

Спасибо за любую помощь!

1 Ответ

2 голосов
/ 20 марта 2019

Вот пример кода, показывающий, как сделать то, что я изложил в комментарии. Я не могу проверить это, потому что у меня не установлены provider или pathos, но это должно дать вам хорошее представление о том, что я предложил.

import importlib
from pathos.helpers import mp
from provider import Provider

def process_data(loc):
    module  = importlib.import_module(loc)
    provider_obj = Provider(module)
    provider_obj.data_processing_func()


if __name__ == '__main__':

    # list of data providers ... length is arbitrary
    operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']

    # create list of provider locations for each operating provider
    provider_loc_list = []
    for name in operating_providers:
        loc = 'providers.%s' % name
        provider_loc_list.append(loc)

    processes = []
    for loc in provider_loc_list:
        process = mp.Process(target=process_data, args=(loc,))
        process.daemon = True
        process.start()
        processes.append(process)

    for process in processes:
        process.join()
...