Как назначить обратный вызов внутри собственного класса, используя многопроцессорный пул Python с функцией apply_async - PullRequest
0 голосов
/ 13 июня 2019

Я пытался использовать многопоточный пул, как в этом вопросе. Но я хочу упаковать всю логику в свой класс, как показано ниже. Проблема возникает в apply_async функции обратного вызова. Когда я упаковываю всю логику в классе, функция обратного вызова, кажется, никогда не вызывается. Я не знаю, как назначить функции обратного вызова, чтобы он вызывался правильно. В исходном вопросе есть только result в log_result параметрах, но я должен добавить дополнительные self параметры.

import numpy 
import pandas as pd 
import multiprocessing as mp 
from multiprocessing import freeze_support

class MutliThread() :
    def __init__(self):
        self.result_list = []

    def foo_pool(index, number):
        data = []
        notFound = []
        try :        
            data.append(index + number)
        except Exception:
            notFound.append(index + number)    
        return data

    def log_result(self, result):
        # This is called whenever foo_pool(i) returns a result.
        # result_list is modified only by the main process, not the pool workers.
        self.result_list.append(self, result)

    def apply_async_with_callback(self):
        pool = mp.Pool()
        data = [1,2,3,4,5,6]
        for index, tarrif in enumerate(data) :
            pool.apply_async(self.foo_pool, args = (index, tarrif), callback = self.log_result)
        pool.close()
        pool.join()
        print(self.result_list)

if __name__ == '__main__':
    freeze_support()
    multiThread = MutliThread()
    multiThread.apply_async_with_callback()

1 Ответ

1 голос
/ 13 июня 2019

Обратный вызов в вашем примере не вызывается, потому что задачи не выполняются.error_callback будет вызываться с TypeError для каждой из задач: foo_pool() takes 2 positional arguments but 3 were given.

Вы должны либо сделать foo_pool обычным методом, добавив self в качестве первого параметра..

def foo_pool(self, index, number):

... или путем украшения его @staticmethod:

@staticmethod
def foo_pool(index, number):

Исправление этого приведет к сбою log_result, потому что вы вызываете list.append с двумя аргументамив то время как для этого требуется только один.

Либо оберните self и result в структуру данных, например, кортеж ...

self.result_list.append((self, result))

... или пропустите добавление selfвсе вместе.В конце концов, это всегда будет ваш MultiThread экземпляр:

self.result_list.append(result)


Кстати, имя MultiThread вводит в заблуждение.Ваш класс оборачивает пул процессов, а не пул потоков.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...