Как сказал кто-то другой, вам нужно использовать несколько процессов для истинного параллелизма вместо потоков, потому что ограничение GIL препятствует одновременной работе потоков.
Если вы хотите использовать стандартную многопроцессорность библиотеку (которая основана на запуске нескольких процессов), я предлагаю использовать пул рабочих .Если я правильно понял, вы хотите запустить более 100 параллельных экземпляров.Запуск более 100 процессов на одном хосте создаст слишком много накладных расходов.Вместо этого создайте пул работников P, где P, например, число ядер на вашем компьютере, и отправьте более 100 заданий в пул.Это просто сделать, и в Интернете есть много примеров .Кроме того, когда вы отправляете задания в пул, вы можете предоставить функцию обратного вызова для получения ошибок.Это может быть достаточно для ваших нужд (есть примеры здесь ).
Однако пул в многопроцессорной обработке не может распределить работу по нескольким хостам (например, кластеру машин) в прошлый раз, когда я смотрел.Поэтому, если вам нужно это сделать или если вам нужна более гибкая схема связи, например, возможность отправлять обновления контролирующему процессу во время работы рабочих, я предлагаю использовать charm4py (обратите внимание, чтоЯ разработчик charm4py, поэтому у меня есть опыт.
С помощью charm4py вы можете создавать N рабочих, которые распределяются между процессами P по времени выполнения (работает на нескольких хостах), и рабочие могут общаться сКонтроллер просто делает удаленный вызов метода.Вот небольшой пример:
from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
import time
WORKER_ITERATIONS = 100
class Worker(Chare):
def __init__(self, controller):
self.controller = controller
@threaded
def work(self, x, done_future):
result = -1
try:
for i in range(WORKER_ITERATIONS):
if i % 20 == 0:
# send status update to controller
self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
if i == 5 and self.thisIndex[0] % 2 == 0:
# trigger NameError on even-numbered workers
test[3] = 3
time.sleep(0.01)
result = x**2
except Exception as e:
# send error to controller
self.controller.collectError(self.thisIndex, e)
# send result to controller
self.contribute(result, Reducer.gather, done_future)
# This custom map is used to prevent workers from being created on process 0
# (where the controller is). Not strictly needed, but allows more timely
# controller output
class WorkerMap(ArrayMap):
def procNum(self, index):
return (index[0] % (charm.numPes() - 1)) + 1
class Controller(Chare):
def __init__(self, args):
self.startTime = time.time()
done_future = charm.createFuture()
# create 12 workers, which are distributed by charm4py among processes
workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
# start work
for i in range(12):
workers[i].work(i, done_future)
print('Results are', done_future.get()) # wait for result
exit()
def progressUpdate(self, worker_id, current_step):
print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
'progress', current_step * 100 / WORKER_ITERATIONS, '%')
# the controller can return a value here and the worker would receive it
def collectError(self, worker_id, error):
print(round(time.time() - self.startTime, 3), ': Got error', error,
'from worker', worker_id)
charm.start(Controller)
В этом примере контроллер будет печатать обновления состояния и ошибки по мере их возникновения.Он напечатает окончательные результаты всех работников, когда они все будут сделаны.Результат для рабочих, которые потерпели неудачу, будет -1.
Количество процессов P дается при запуске.Среда выполнения распределяет N рабочих по доступным процессам.Это происходит, когда рабочие создаются, и в этом конкретном примере отсутствует динамическая балансировка нагрузки.
Также обратите внимание, что в модели charm4py удаленный вызов метода асинхронный и возвращает будущее, которое может заблокировать вызывающая сторона, ноблокирует только вызывающий поток (не весь процесс).
Надеюсь, это поможет.