Почему работники создают копии объектов из основного потока? - PullRequest
0 голосов
/ 08 октября 2019

Я пытаюсь создать простой пул с рабочими в Python, чтобы получить значения от итератора в главном потоке и, таким образом, обновить этот итератор. (Цель состоит в том, чтобы распараллелить итератор при использовании его результатов в главном потоке)

 import multiprocessing as mp

 pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))
 #details are given below

Но по какой-то причине Pool, похоже, создает копии итератора для каждого потока, а не просто обновляет егов основной теме. (Вопросы находятся в конце поста)

Итератор

Итак, это итератор, который я пытаюсь распараллелить. Я уверен, что безопасно брать элементы из него параллельно и что обновленные значения не используются при получении элементов:

class TrySeq(object):
    def __init__(self):
        print('created iterator')
        self.gotten = 0 #a simple counter informing how many items were gotten
    def __len__(self):
        return 10
    def __getitem__(self, i):
        time.sleep(3) #simulate a heavy operation

        #must update the gotten count but this value won't affect the values of the items
        self.gotten += 1 
        print('Iterator: got item', i, ' - gotten total: ', self.gotten)
        return (i, i)

Генератор для распараллеливания

Теперь, это генераторэто обернет этот итератор, чтобы распараллелить его «невидимо».

Работает довольно хорошо, работает именно так, как я ожидал, за исключением обновления значения gotten. (Я знаю, что ожидание синхронизации в каждом epoch, это не проблема для этого вопроса).

#A generator that wraps an iterator and loads items assynchronously   
def ParallelIterator(iterator, epochs, shuffle, workers = 4, queue_size = 10):

    sourceQueue = mp.Queue()                     #queue for getting batch indices
    batchQueue = mp.Queue(maxsize = queue_size)  #queue for getting actual batches 
    indices = np.arange(len(iterator))     #array of indices to be shuffled

    #fills the batch indices queue (called when sourceQueue is empty -> a few batches before an epoch ends)
    def fillSource():
        #print("Iterator: fill source - source qsize = ", sourceQueue.qsize() )
        if shuffle == True:
            np.random.shuffle(indices)

        #puts the indices in the indices queue
        for i in indices:
            sourceQueue.put(i)

    #function that will load batches from the iterator
    def worker(indicesQueue, destinationQueue, itera):
        while True:
            index = indicesQueue.get(block = True) #get index from the queue
            item = itera[index] #get batch from the iterator
            destinationQueue.put((index,item), block=True) #puts batch in the batch queue


    #creates the thread pool that will work automatically as we get from the batch queue
    pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))

    #generation loop
    for epoch in range(epochs):
        fillSource()
        for batch in range(len(iterator)):

            #yields batches for the outside loop that is using this generator
            originalIndex, batchItems = batchQueue.get(block = True)
            yield epoch, batch, originalIndex, batchItems

    pool.close()
    sourceQueue.close()
    batchQueue.close()
    del pool
    del sourceQueue
    del batchQueue

Кажется, Pool просто копирует итератор для каждого потока, но я хочу, чтобы все потоки обновляли один и тот же генератор в главном потоке

Использование генератора:

Идея состоит в том, чтобы использовать его очень просто, например:

#outside loop: 
for e, b, oB, xAndY in ParallelIterator(TrySeq(), 3, True, workers = 3):
    time.sleep(1)
    #print('yield e:', e, " - b:", b, " origB: ", oB, "- data:", xAndY)

Токовые выходы:

Теперь, когда я запускаю это, я вижу, что он имеет gottenзначение для каждого работников вместо основного gotten значения, как ожидалось:

created iterator
Iterator: got item 8  - gotten total:  1
Iterator: got item 2  - gotten total:  1
Iterator: got item 0  - gotten total:  1
Iterator: got item 1  - gotten total:  2
Iterator: got item 7  - gotten total:  2
Iterator: got item 6  - gotten total:  2
Iterator: got item 9  - gotten total:  3
Iterator: got item 5  - gotten total:  3
Iterator: got item 3  - gotten total:  3
Iterator: got item 4  - gotten total:  4
Iterator: got item 4  - gotten total:  4
Iterator: got item 2  - gotten total:  5
Iterator: got item 3  - gotten total:  4
Iterator: got item 6  - gotten total:  5
Iterator: got item 7  - gotten total:  5
Iterator: got item 5  - gotten total:  6
Iterator: got item 1  - gotten total:  6
Iterator: got item 9  - gotten total:  7
Iterator: got item 0  - gotten total:  6
Iterator: got item 8  - gotten total:  7
Iterator: got item 7  - gotten total:  8
Iterator: got item 8  - gotten total:  7
Iterator: got item 2  - gotten total:  8
Iterator: got item 3  - gotten total:  8
Iterator: got item 9  - gotten total:  9
Iterator: got item 1  - gotten total:  9
Iterator: got item 6  - gotten total:  9
Iterator: got item 4  - gotten total:  10
Iterator: got item 0  - gotten total:  10
Iterator: got item 5  - gotten total:  10
finished

Вопросы

  • Почему это происходит?
  • Как мне обновить ParallelIterator, чтобы он действовал на основной iterator вместо создания одной копии на поток?

1 Ответ

2 голосов
/ 08 октября 2019

Вы не показываете свой импорт, но я предполагаю, что у вас есть:

import multiprocessing as mp

в верхней части файла. multiprocessing не поддерживается потоками, оно поддерживается fork ed или порожденными процессами, каждый из которых имеет независимую память и независимые переменные. При инициализации выполняется выборка значений (что важно, iterator), а затем отбор новых копий каждого значения и их использование отдельно в каждом рабочем процессе (примечание: в системах, которые fork, а не порождают рабочие,травление может быть не задействовано, но эффект тот же: исходные данные «моментально снимаются» во время fork, и каждый рабочий процесс наследует свой собственный независимый снимок данных, не оставляя связей с данными в других процессах. ).

Если вы намеревались использовать потоки, измените свой импорт на:

import multiprocessing.dummy as mp

, который изменяет базовую реализацию на пул на основе потоков, а не на пул на основе процессов. Пулы на основе потоков находятся в одном пространстве общей памяти;никакое травление / расслоение или межпроцессное взаимодействие любого рода не участвуют. Недостатком является то, что параллелизм в эталонном интерпретаторе CPython будет ограничен GIL , а большее совместное использование означает, что для предотвращения условий гонки требуется больше синхронизации.

Если вам нужны процессы, то это происходитбыть королевской болью, так как вы фактически застряли с реализацией прокси-оболочек для вашего типа итератора, чтобы сделал его multiprocessing.Manager совместимым , что будет королевской болью.

...