`multiprocessing`` starmap_async` вызывает только один раз обратный вызов? - PullRequest
0 голосов
/ 27 апреля 2019

У меня есть следующий код, который создает пул для 4 рабочих и вызывает рабочий метод.код работает нормально по большей части.во время бега я вижу, что для обработки работы вызываются разные работники.Однако calc_completed никогда не вызывается один раз в самом конце, когда все работники готовы.это ожидаемое поведение?Я ожидал, что обратный вызов произойдет, когда каждый рабочий будет завершен.

def calculate_worker(x, y):
    print 'working...'
    ...

def calc_completed(result):
    print 'completed: %s'%str(result)

def calc_errored(result):
    print 'error: %s'%str(result)

if __name__ == '__main__':  
    start, stop, step = 1, 1000, 1
    ranges = [(n, min(n+step, stop)) for n in xrange(start, stop, step)]

    pool = mp.Pool(processes=8)

    res = pool.starmap_async(calculate_worker, ranges,
                             callback=calculate_worker, error_callback=calc_completed)  

    pool.close()
    pool.join()
    d = res.get()       
    print(d)

1 Ответ

0 голосов
/ 27 апреля 2019

calc_completed is вызывается только в случае возникновения ошибки при выполнении сопоставленной функции (здесь: calculate_worker).

Другая проблема в вашем коде заключается в том, что вы оба параллельно запускаете calculate_worker функцию и используете ее как callback.Это не имеет особого смысла, так как calculate_worker будет вызываться дважды - во-первых: как рабочий , а во-вторых: как функция, сообщающая о завершении расчета.Там у вас должно быть две разные функции.

С учетом функций из предоставленного вами фрагмента я изменил бы его следующим образом:

res = pool.starmap_async(calculate_worker, ranges,
                         callback=calc_completed, 
                         error_callback=calc_errored)  

Если вы хотите проверить, вызывается ли calc_erroredсоответственно, вы можете ввести некоторые случайные ошибки в функцию calculate_worker, чтобы посмотреть, будет ли она обрабатываться, например,

def calculate_worker(x, y):
    if (x % 7):
      x / (y - y)  # division by zero
    print 'working...'
...