Есть ли какой-нибудь способ поставить фьючерс на питон в fifo? - PullRequest
0 голосов
/ 14 октября 2018

Я пытаюсь собрать сценарий Python для параллельной обработки.Он использует фьючерсы для выполнения сложных задач обработки, в то время как основной поток строит задачи для их выполнения и помещает их в multiprocessing.Queue.Есть еще один процесс, который касается результатов фьючерсов (чтение результатов и составление окончательного результата).Я присоединяюсь к будущему в своем шаге сокращения, потому что я могу позволить себе ждать там.Тем не менее, я получаю сообщение об ошибке из-за того, что не могу выбрать объект rthread.

Traceback (most recent call last):
  File "/usr/lib64/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib64/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.RLock objects

Вот мой код в его нынешнем виде, я не могу придумать идею, которая на самом деле хорошо работает,Порядок результатов имеет значение, если я отправляю задания A и B. Мне нужно, чтобы A вышел первым, а затем B.

У меня были следующие идеи:

  • Создание обратного вызоваэто добавляет результат на готово.(Нет гарантии заказа)
  • Обычная очередь, которую я жду, когда все они будут выполнены, затем отправляю материал в рабочую очередь.(сложнее)

Должно быть простое решение, которого я здесь не вижу.

# Does video encoding in the background
def worker():
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter('out.avi', fourcc, 10, (1000, 75))
    n = 0;
    while True:
        task = workqueue.get()
        print(task)
        img = cv2.imread(task)
        out.write(img)
        os.remove(task)
        workqueue.task_done()
    out.release()
    cv2.destoryAllWindows()

workqueue = JoinableQueue(maxsize=50)
with open('runtime_trace.csv') as f:
    t = Process(target=worker)
    t.daemon = True
    t.start()
    max_size = get_max_alloc_size()
    reader = csv.DictReader(f, ['i', 'addr', 'size', 'alloc'])
    last_val = 0 
    vals = []

    with concurrent.futures.ProcessPoolExecutor() as exe:
        for i,d in enumerate(reader):
            d = fix_types(d)
            if d['i'] != last_val:
                future = exe.submit(plot, vals, last_val, max_size)
                workqueue.put(future)
                vals = []
            vals.append(d)
            last_val = d['i']

        workqueue.join()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...