Как создать асинхронную многопроцессорную JobQueue в Python? - PullRequest
0 голосов
/ 13 июня 2019

Я пытаюсь создать Python 'JobQueue', который асинхронно выполняет вычислительные задачи на локальном компьютере, с механизмом, который возвращает результаты каждой задачи в основной процесс.multiprocessing.Pool в Python имеет функцию apply_async(), которая удовлетворяет этим требованиям, принимая произвольную функцию, ее множественные аргументы и функции обратного вызова, которые возвращают результаты.Например ...

    import multiprocessing

    pool = multiprocessing.Pool(poolsize)
    pool.apply_async(func, args=args, 
                     callback=mycallback,
                     error_callback=myerror_callback)

Единственная проблема состоит в том, что функция, заданная для apply_async(), должна быть , сериализуемой с Pickle , а функции, которые мне нужно запускать одновременно, - нет.К вашему сведению, причина в том, что целевая функция является членом объекта, который содержит объект IDL, например:

    from idlpy import IDL
    self.idl_obj = IDL.obj_new('ImageProcessingEngine')

Это сообщение об ошибке, полученное в строке pool.apply_async():

'Can't pickle local object 'IDL.__init__.<locals>.run''

Что я пытался

Я сделал простую реализацию JobQueue (см. Ниже), которая прекрасно работает в Python 3.6+, предоставил объект Job и этоrun() метод является Pickleable (что они в примере ниже).Круто то, что основной процесс может получать произвольно сложный объем данных, возвращаемых из асинхронно выполняемой функции через функцию обратного вызова.

Я пытался использовать pathos.pools.ProcessPool, поскольку он использует dill вместо pickle.Однако у него нет метода, подобного apply_async().Я хотел бы провести рефакторинг JobQueue.add(), чтобы обойти эту проблему сериализации.

Существуют ли какие-либо другие опции или сторонние библиотеки, которые предоставляют эту функциональность, используя dill или каким-либо другим способом?

1 Ответ

0 голосов
/ 16 июля 2019

Как насчет создания функции-заглушки, которая создаст экземпляр конечной точки IDL как статическая переменная функции ?

Обратите внимание, что это всего лишь набросок кода, поскольку трудно сказатьот вопроса, передаете ли вы объекты IDL в качестве параметров в функцию, которую вы выполняете параллельно, или она служит другой цели.

def stub_fun(paramset):
    if 'idl_obj' not in dir(stub_fun):  # instantiate once
        stub_fun.idl_obj = IDL.obj_new('ImageProcessingEngine')

    return stub_fun.idl_obj(paramset)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...