Настройка задачи Celery с кешем памяти для видеокадров в качестве стратегии циклического буфера в python - PullRequest
0 голосов
/ 30 октября 2018

Я хочу построить конвейер многозадачной обработки на Celery и хочу, чтобы несколько задач обрабатывали один и тот же видеофайл. Задачи должны поделиться видео данными. Поэтому не каждая задача должна декодировать и извлекать кадры из видеофайла. Видеоданные будут представлять собой список извлеченных кадров (не каждый кадр видео необходим).

Есть ли какое-либо решение для эффективного обмена этими кадрами? Задачи могут обрабатываться на разных узлах. Но я не хочу делиться данными по сети, такими как Memcached или Redis. Задача должна искать видеоданные в памяти / кеше, если ее там нет, задача должна выпустить другую задачу для загрузки видео и извлечения кадров в кеш.

(производитель и несколько потребителей для каждого видеофайла)

Таким образом, задачи на одном узле / машине могут совместно использовать кэшированные данные. Две задачи на разных узлах не имеют преимуществ при кэшировании.

Я не хочу кэшировать все извлеченное видео, должно быть некоторое циклическое буферирование. Кэш на видео имеет фиксированный размер, скажем, 100 кадров. Разрыв между самой быстрой и самой медленной задачей не может превышать 100 кадров. В памяти / кэше всего 100 кадров.

Возникает два основных вопроса:

  1. Настройка задачи

    Задача A: извлечение кадров из видео (производитель в память / кэш)

    Задача B1: использование кадров и выполнение реальной работы (обработка кадров)

    . .

    Задача Bn: потребление кадров и выполнение реальной работы (обработка кадров)

    A, B1 - Bn работает параллельно. Но тогда эти задачи должны выполняться на одном узле. Если B taks распространяются на разных узлах, что-то должно порождать другую задачу A (по одному на каждый узел для декодирования и извлечения кадров). Какой подход вы рекомендуете здесь? Какой будет лучший выбор?

  2. Python кеш

    Существуют ли какие-либо библиотеки / реализации / решения для кэширования, которые лучше всего подходят для моего варианта использования для кэширования больших данных на локальном компьютере с использованием некоторой реализации циклического буфера? Что-то вроде DiskCache , но с возможностью кэширования только 100 кадров путем его кольцевой буферизации.

Какие подходы и конструкции вы рекомендуете для реализации моего варианта использования? Я хотел бы придерживаться Сельдерея для распределения задач.

1 Ответ

0 голосов
/ 30 октября 2018

Это может показывать мое упрямство, но я всегда считал, что такие проекты, как сельдерей, которые добавляют сложность поверх многопроцессорной обработки (которая уже сложна), доставляют больше хлопот, чем они того стоят. Также нет лучшей альтернативы imo использованию совместно используемой памяти stdlib и мьютексов с точки зрения скорости и простоты.

В вашем случае простым решением было бы просто использовать очередь fifo для каждого процесса и поместить в каждый кадр от производителя. Это, естественно, привело бы к большому потреблению памяти, если бы вы делали n копий каждого кадра для n потребителей, однако вы, вероятно, могли бы довольно легко придумать механизм для помещения самих кадров в multiprocessing.sharedctypes.Array и пропускания только индексов через вместо очереди Пока очереди ограничены по длине короче, чем длина буфера, вы должны быть ограничены от перезаписи кадра в буфере, пока он не будет использован всеми потребителями. Без какой-либо синхронизации, это полетело бы у вас под штанами, но немного магии мьютекса определенно могло бы сделать это очень надежным решением.

например:

import numpy as np
from time import sleep
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.sharedctypes import Array
from ctypes import c_uint8
from functools import reduce

BUFSHAPE = (10,10,10) #10 10x10 images in buffer

class Worker(Process):
    def __init__(self, q_size, buffer, name=''):
        super().__init__()
        self.queue = Queue(q_size)
        self.buffer = buffer
        self.name = name

    def run(self,): #do work here
        #I hardcoded datatype here. you might need to communicate it to the child process
        buf_arr = np.frombuffer(self.buffer.get_obj(), dtype=c_uint8)
        buf_arr.shape = BUFSHAPE
        while True:
            item = self.queue.get()
            if item == 'done':
                print('child process: {} completed all frames'.format(self.name))
                return
            with self.buffer.get_lock(): #prevent writing while we're reading
                #slice the frame from the array uning the index that was sent
                frame = buf_arr[item%BUFSHAPE[0]] #depending on your use, you may want to make a copy here
            #do some intense processing on `frame`
            sleep(np.random.rand())
            print('child process: {} completed frame: {}'.format(self.name, item))

def main():
    #creating shared array
    buffer = Array(c_uint8, reduce(lambda a,b: a*b, BUFSHAPE))
    #make a numpy.array using that memory location to make it easy to stuff data into it
    buf_arr = np.frombuffer(buffer.get_obj(), dtype=c_uint8)
    buf_arr.shape = BUFSHAPE
    #create a list of workers
    workers = [Worker(BUFSHAPE[0]-2, #smaller queue than buffer to prevent overwriting frames not yet consumed
                      buffer, #pass in shared buffer array
                      str(i)) #numbered child processes
                      for i in range(5)] #5 workers

    for worker in workers: #start the workers
        worker.start()
    for i in range(100): #generate 100 random frames to send to workers
        #insert a frame into the buffer
        with buffer.get_lock(): #prevent reading while we're writing
            buf_arr[i%BUFSHAPE[0]] = np.random.randint(0,255, size=(10,10), dtype=c_uint8)
        #send the frame number to each worker for processing. If the input queue is full, this will block until there's space
        # this is what prevents `buf_arr[i%BUFSHAPE[0]] = np...` from overwriting a frame that hasn't been processed yet
        for worker in workers:
            worker.queue.put(i)
    #when we're done send the 'done' signal so the child processes exit gracefully (or you could make them daemons)
    for worker in workers:
        worker.queue.put('done')
        worker.join()


if __name__ == "__main__":
    freeze_support()
    main()

РЕДАКТИРОВАТЬ

Для некоторой ошибки типа «один за другим» требуется, чтобы очередь была на 2 кадра меньше буфера, а не на 1 кадр меньше, чтобы предотвратить перезапись кадра раньше времени.

EDIT2 - пояснение первого редактирования:

Причина len(q) = len(buf)-2 заключается в том, что q.get() вызывается до того, как мы получим кадр из буфера, и сам кадр записывается перед тем, как мы пытаемся отправить индекс в очередь. Если разница в длине составляет всего 1, работник может извлечь индекс очереди из очереди, тогда производитель может увидеть, что он может перейти в очередь сейчас и перейти к следующему кадру, прежде чем работник сможет прочитать кадр. сам. Есть много способов, которыми вы могли бы подойти к этому по-другому, что могло бы позволить меньшему количеству процессов, ожидающих друг друга все время, возможно, используя mp.Event.

...