Отправлять массивы dask распределенному клиенту, одновременно используя результаты - PullRequest
0 голосов
/ 05 февраля 2019

У меня есть dask массивы, которые представляют кадры видео и я хочу создать несколько видеофайлов.Я использую библиотеку imageio, которая позволяет мне "добавлять" кадры в подпроцесс ffmpeg.Поэтому у меня может быть что-то вроде этого:

my_frames = [[arr1f1, arr1f2, arr1f3], [arr2f1, arr2f2, arr2f3], ...]

Таким образом, каждый внутренний список представляет кадры для одного видео (или продукта).Я ищу лучший способ отправки / отправки фреймов для вычисления, а также записи фреймов в imageio по мере их завершения (по порядку).Чтобы сделать его более сложным, приведенные выше внутренние списки фактически являются генераторами и могут состоять из 100 или 1000 кадров.Также имейте в виду, что из-за того, как работает imageio, я думаю, что он должен существовать в одном процессе.Вот упрощенная версия того, что я работаю до сих пор:

for frame_arrays in frames_to_write:
    # 'frame_arrays' is [arr1f1, arr2f1, arr3f1, ...]
    future_list = _client.compute(frame_arrays)
    # key -> future
    future_dict = dict(zip(frame_keys, future_list))

    # write the current frame
    # future -> key
    rev_future_dict = {v: k for k, v in future_dict.items()}
    result_iter = as_completed(future_dict.values(), with_results=True)
    for future, result in result_iter:
        frame_key = rev_future_dict[future]
        # get the writer for this specific video and add a new frame
        w = writers[frame_key]
        w.append_data(result)

Это работает, и мой реальный код реорганизован из вышеупомянутого, чтобы передать следующий кадр при написании текущего кадра, так что я думаю, что есть некоторая выгода,Я думаю о решении, в котором пользователь говорит: «Я хочу обрабатывать X кадров одновременно», поэтому я отправляю 50 кадров, пишу 50 кадров, отправляю еще 50 кадров, пишу 50 кадров и т. Д.

MyПосле некоторой работы над этим вопросы:

  1. Когда данные result хранятся в локальной памяти?Когда он возвращается итератором или когда он завершен?
  2. Можно ли сделать что-то подобное с многопоточным планировщиком dask-core, чтобы пользователю не нужно было распространять установленный файл?
  3. Можно ли адаптировать количество отправляемых кадров в зависимости от количества рабочих?
  4. Есть ли способ отправить словарь массивов dask и / или использовать as_completed с включенным ключом frame_key?
  5. Если я загружу всю серию кадров и отправлю их клиенту / кластеру, я, вероятно, убью планировщика, верно?
  6. Используется ли get_client() с последующим Client() на ValueError предпочтительнымспособ получения клиента (если он не предоставлен пользователем)?
  7. Можно ли дать dask / распределенный один или несколько итераторов, из которых он извлекает, когда рабочие становятся доступными?
  8. Я являюсьтупой?Слишком сложно?

Примечание. Это своего рода расширение этой проблемы , которое я сделал некоторое время назад, но немного другое.

1 Ответ

0 голосов
/ 09 февраля 2019

После множества примеров здесь я получил следующее:

    try:
        # python 3
        from queue import Queue
    except ImportError:
        # python 2
        from Queue import Queue
    from threading import Thread

    def load_data(frame_gen, q):
        for frame_arrays in frame_gen:
            future_list = client.compute(frame_arrays)
            for frame_key, arr_future in zip(frame_keys, future_list):
                q.put({frame_key: arr_future})
        q.put(None)

    input_q = Queue(batch_size if batch_size is not None else 1)
    load_thread = Thread(target=load_data, args=(frames_to_write, input_q,))
    remote_q = client.gather(input_q)
    load_thread.start()

    while True:
        future_dict = remote_q.get()
        if future_dict is None:
            break

        # write the current frame
        # this should only be one element in the dictionary, but this is
        # also the easiest way to get access to the data
        for frame_key, result in future_dict.items():
            # frame_key = rev_future_dict[future]
            w = writers[frame_key]
            w.append_data(result)
        input_q.task_done()

    load_thread.join()

Это отвечает на большинство моих вопросов и, кажется, работает так, как я хочу в целом.

...