Я пытаюсь собрать сценарий Python для параллельной обработки.Он использует фьючерсы для выполнения сложных задач обработки, в то время как основной поток строит задачи для их выполнения и помещает их в multiprocessing.Queue
.Есть еще один процесс, который касается результатов фьючерсов (чтение результатов и составление окончательного результата).Я присоединяюсь к будущему в своем шаге сокращения, потому что я могу позволить себе ждать там.Тем не менее, я получаю сообщение об ошибке из-за того, что не могу выбрать объект rthread.
Traceback (most recent call last):
File "/usr/lib64/python3.6/multiprocessing/queues.py", line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib64/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.RLock objects
Вот мой код в его нынешнем виде, я не могу придумать идею, которая на самом деле хорошо работает,Порядок результатов имеет значение, если я отправляю задания A и B. Мне нужно, чтобы A вышел первым, а затем B.
У меня были следующие идеи:
- Создание обратного вызоваэто добавляет результат на готово.(Нет гарантии заказа)
- Обычная очередь, которую я жду, когда все они будут выполнены, затем отправляю материал в рабочую очередь.(сложнее)
Должно быть простое решение, которого я здесь не вижу.
# Does video encoding in the background
def worker():
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('out.avi', fourcc, 10, (1000, 75))
n = 0;
while True:
task = workqueue.get()
print(task)
img = cv2.imread(task)
out.write(img)
os.remove(task)
workqueue.task_done()
out.release()
cv2.destoryAllWindows()
workqueue = JoinableQueue(maxsize=50)
with open('runtime_trace.csv') as f:
t = Process(target=worker)
t.daemon = True
t.start()
max_size = get_max_alloc_size()
reader = csv.DictReader(f, ['i', 'addr', 'size', 'alloc'])
last_val = 0
vals = []
with concurrent.futures.ProcessPoolExecutor() as exe:
for i,d in enumerate(reader):
d = fix_types(d)
if d['i'] != last_val:
future = exe.submit(plot, vals, last_val, max_size)
workqueue.put(future)
vals = []
vals.append(d)
last_val = d['i']
workqueue.join()