Я пытаюсь создать простой пул с рабочими в Python, чтобы получить значения от итератора в главном потоке и, таким образом, обновить этот итератор. (Цель состоит в том, чтобы распараллелить итератор при использовании его результатов в главном потоке)
import multiprocessing as mp
pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))
#details are given below
Но по какой-то причине Pool
, похоже, создает копии итератора для каждого потока, а не просто обновляет егов основной теме. (Вопросы находятся в конце поста)
Итератор
Итак, это итератор, который я пытаюсь распараллелить. Я уверен, что безопасно брать элементы из него параллельно и что обновленные значения не используются при получении элементов:
class TrySeq(object):
def __init__(self):
print('created iterator')
self.gotten = 0 #a simple counter informing how many items were gotten
def __len__(self):
return 10
def __getitem__(self, i):
time.sleep(3) #simulate a heavy operation
#must update the gotten count but this value won't affect the values of the items
self.gotten += 1
print('Iterator: got item', i, ' - gotten total: ', self.gotten)
return (i, i)
Генератор для распараллеливания
Теперь, это генераторэто обернет этот итератор, чтобы распараллелить его «невидимо».
Работает довольно хорошо, работает именно так, как я ожидал, за исключением обновления значения gotten
. (Я знаю, что ожидание синхронизации в каждом epoch
, это не проблема для этого вопроса).
#A generator that wraps an iterator and loads items assynchronously
def ParallelIterator(iterator, epochs, shuffle, workers = 4, queue_size = 10):
sourceQueue = mp.Queue() #queue for getting batch indices
batchQueue = mp.Queue(maxsize = queue_size) #queue for getting actual batches
indices = np.arange(len(iterator)) #array of indices to be shuffled
#fills the batch indices queue (called when sourceQueue is empty -> a few batches before an epoch ends)
def fillSource():
#print("Iterator: fill source - source qsize = ", sourceQueue.qsize() )
if shuffle == True:
np.random.shuffle(indices)
#puts the indices in the indices queue
for i in indices:
sourceQueue.put(i)
#function that will load batches from the iterator
def worker(indicesQueue, destinationQueue, itera):
while True:
index = indicesQueue.get(block = True) #get index from the queue
item = itera[index] #get batch from the iterator
destinationQueue.put((index,item), block=True) #puts batch in the batch queue
#creates the thread pool that will work automatically as we get from the batch queue
pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))
#generation loop
for epoch in range(epochs):
fillSource()
for batch in range(len(iterator)):
#yields batches for the outside loop that is using this generator
originalIndex, batchItems = batchQueue.get(block = True)
yield epoch, batch, originalIndex, batchItems
pool.close()
sourceQueue.close()
batchQueue.close()
del pool
del sourceQueue
del batchQueue
Кажется, Pool
просто копирует итератор для каждого потока, но я хочу, чтобы все потоки обновляли один и тот же генератор в главном потоке
Использование генератора:
Идея состоит в том, чтобы использовать его очень просто, например:
#outside loop:
for e, b, oB, xAndY in ParallelIterator(TrySeq(), 3, True, workers = 3):
time.sleep(1)
#print('yield e:', e, " - b:", b, " origB: ", oB, "- data:", xAndY)
Токовые выходы:
Теперь, когда я запускаю это, я вижу, что он имеет gotten
значение для каждого работников вместо основного gotten
значения, как ожидалось:
created iterator
Iterator: got item 8 - gotten total: 1
Iterator: got item 2 - gotten total: 1
Iterator: got item 0 - gotten total: 1
Iterator: got item 1 - gotten total: 2
Iterator: got item 7 - gotten total: 2
Iterator: got item 6 - gotten total: 2
Iterator: got item 9 - gotten total: 3
Iterator: got item 5 - gotten total: 3
Iterator: got item 3 - gotten total: 3
Iterator: got item 4 - gotten total: 4
Iterator: got item 4 - gotten total: 4
Iterator: got item 2 - gotten total: 5
Iterator: got item 3 - gotten total: 4
Iterator: got item 6 - gotten total: 5
Iterator: got item 7 - gotten total: 5
Iterator: got item 5 - gotten total: 6
Iterator: got item 1 - gotten total: 6
Iterator: got item 9 - gotten total: 7
Iterator: got item 0 - gotten total: 6
Iterator: got item 8 - gotten total: 7
Iterator: got item 7 - gotten total: 8
Iterator: got item 8 - gotten total: 7
Iterator: got item 2 - gotten total: 8
Iterator: got item 3 - gotten total: 8
Iterator: got item 9 - gotten total: 9
Iterator: got item 1 - gotten total: 9
Iterator: got item 6 - gotten total: 9
Iterator: got item 4 - gotten total: 10
Iterator: got item 0 - gotten total: 10
Iterator: got item 5 - gotten total: 10
finished
Вопросы
- Почему это происходит?
- Как мне обновить
ParallelIterator
, чтобы он действовал на основной iterator
вместо создания одной копии на поток?