Многопроцессорная обработка
передает задачи (включая check_one
и data
) рабочим процессам через mp.SimpleQueue
. В отличие от Queue.Queue
s, все, что находится в mp.SimpleQueue
, должно быть отборным. Queue.Queue
s нельзя выбрать:
import multiprocessing as mp
import Queue
def foo(queue):
pass
pool=mp.Pool()
q=Queue.Queue()
pool.map(foo,(q,))
дает это исключение:
UnpickleableError: Cannot pickle <type 'thread.lock'> objects
Ваш data
включает packages
, который является Queue.Queue. Это может быть источником проблемы.
Вот возможный обходной путь: Queue
используется для двух целей:
- , чтобы узнать приблизительный размер (позвонив по номеру
qsize
)
- для сохранения результатов для последующего извлечения.
Вместо вызова qsize
, чтобы разделить значение между несколькими процессами, мы могли бы использовать mp.Value
.
Вместо сохранения результатов в очереди мы можем (и должны) просто возвращать значения из вызовов в check_one
. pool.map
собирает результаты в очередь своего собственного производства и возвращает результаты как возвращаемое значение pool.map
.
Например:
import multiprocessing as mp
import Queue
import random
import logging
# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)
qsize = mp.Value('i', 1)
def check_one(args):
total, package, version = args
i = qsize.value
logger.info('\r[{0:.1%} - {1}, {2} / {3}]'.format(
i / float(total), package, i, total))
new_version = random.randrange(0,100)
qsize.value += 1
if new_version > version:
return (package, version, new_version, None)
else:
return None
def update():
logger.info('Searching for updates')
set_len=10
data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
for i in range(set_len) )
pool = mp.Pool()
results = pool.map(check_one, data)
pool.close()
pool.join()
for result in results:
if result is None: continue
package, version, new_version, json = result
txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
package, new_version, version)
logger.info(txt)
logger.info('Updating finished successfully')
if __name__=='__main__':
logging.basicConfig(level=logging.DEBUG)
update()