Напишите python с joblib параллельно в списке - PullRequest
0 голосов
/ 24 февраля 2020

Я использую joblib для параллельной работы, я хочу записать результаты параллельно в список.

Чтобы избежать проблем, я заранее создаю список ldata = [], чтобы он мог быть легко доступным.

Во время распараллеливания данные доступны в списке, но больше не могут быть объединены.

Как можно сохранять данные параллельно?

from joblib import Parallel, delayed
import multiprocessing

data = []

def worker(i):    
    ldata = []
    ...                     # create list ldata    
    data[i].append(ldata)

for i in range(0, 1000):
    data.append([])

num_cores = multiprocessing.cpu_count()
Parallel(n_jobs=num_cores)(delayed(worker)(i) for i in range(0, 1000))

resultlist = []

for i in range(0, 1000):
    resultlist.extend(data[i])

1 Ответ

0 голосов
/ 24 февраля 2020

Вы должны рассматривать Parallel как параллельную map операцию, которая не учитывает побочные эффекты. Модель выполнения Parallel состоит в том, что по умолчанию она запускает новые рабочие копии главных процессов, сериализует входные данные, отправляет их рабочим, заставляет их выполнять итерацию по ним, а затем собирает возвращаемые значения. Любое изменение, выполняемое работником в data, остается в его собственном пространстве памяти и, таким образом, невидимо для основного процесса. У вас есть два варианта:

Во-первых, ваши работники могут вернуть ldata вместо обновления data[i]. В этом случае data должен быть присвоен результат, возвращаемый Parallel(...)(...):

def worker(i):
   ...
   return ldata

data = Parallel(n_jobs=num_cores)(delayed(worker)(i) for i in range(0, 1000))

Второй вариант - принудительно использовать семантику совместно используемой памяти, в которой вместо процессов используются потоки. Когда работы выполняются в потоках, их пространство памяти - это пространство главного процесса, в котором изначально находится data. Чтобы применить эту семантику, добавьте require='sharedmem' аргумент ключевого слова в вызове к Parallel:

Parallel(n_jobs=num_cores, require='sharedmem')(delayed(worker)(i) for i in range(0, 1000))

Различные режимы и семантика описаны в документации joblib здесь .

Имейте в виду, что ваша worker() функция написана в чистом виде Python и поэтому интерпретируется. Это означает, что рабочие потоки не могут работать полностью одновременно, даже если на процессор приходится только один поток из-за страшной глобальной блокировки интерпретатора (GIL). Это также объясняется в документации. Поэтому вам лучше придерживаться первого решения в целом, несмотря на издержки сортировки и межпроцессного взаимодействия.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...