Python Asynio: невозможно запустить параллельную программу Asynio - PullRequest
2 голосов
/ 27 марта 2019

Я использую Asyncio для параллельной обработки 58 камер. Сначала я пытаюсь загрузить все изображения из imread метода opencv python. Я попробовал и то, и другое, обычным способом (последовательно) и с Asynio, но они оба занимают почти одинаковое количество времени.

 async def loadImage(i):
    return base64.b64encode(cv2.imread(my_local_path)).decode('utf-8')

def loadImageSync(i):
    return base64.b64encode(cv2.imread("frames_set/Kamera {}.jpg".format(i))).decode('utf-8')

Основной функционал

async def main():

        starttime_lmSync = time.time()
        lm = [loadImageSync(i) for i in range(1,59)]
        print("Loading Images {}".format(time.time() - starttime_lmSync))

        starttime_lm = time.time()
        lm = [loadImage(i) for i in range(1,59)]
        rawImage = await asyncio.gather(*lm)
        print("Loading Images aSync {}".format(time.time() - starttime_lm))

Выход:

Загрузка изображений 1.320235013961792

Загрузка изображений aSync 1.3253769874572754

Что я делаю не так? Или это ожидается?

После загрузки пакета изображений я хочу декодировать их и преобразовать в массив. Для одного изображения это занимает ~ 0,02 секунды, поэтому, чтобы обработать их все параллельно, я использую asynio

async def process_image(im):
    return  np.asarray(np.frombuffer(base64.b64decode(im), np.uint8),dtype=np.float32).reshape((1080,1920,3))

starttime_process = time.time()
futures = [process_image(img_b64) for img_b64 in rawImage]
res = await asyncio.gather(*futures)
print("total time taken {}".format(time.time() - starttime_process))

вывод

Общее время занято 1.2220990657806396

Опять же, время, которое требуется, почти равно времени последовательного вызова. Есть что-то, чего мне не хватает?

Python версия: 3.7 ОС: Ubuntu 16.04

1 Ответ

2 голосов
/ 27 марта 2019

Ваша функция loadImage не является сопрограммой, поэтому даже если вы использовали asyncio.gather для их параллельного вызова, они заблокируют поток.

Вам необходимо делегировать задачив разные потоки с помощью пула потоков, чтобы каждая сопрограмма выполнялась в отдельном потоке без блокировки друг друга.

Вот один подход с ThreadPoolExecutor:

executor = concurrent.futures.ThreadPoolExecutor(max_workers=60)

Так как большинство потоков будетбыть связанным с вводом / выводом, порождение 60 не должно быть огромной проблемой.

Теперь вы можете изменить свой loadImage как:

async def loadImage(i, event_loop, executor):
    return base64.b64encode(await event_loop.run_in_executor(executor, cv2.imread, my_local_path)).decode('utf-8')

Выше должно получиться loadImage a (сопрограммы), и вы должны увидеть улучшения в скорости выполнения задач, связанных с вводом / выводом.

(Может быть некоторое снижение скорости из-за необходимости порождать столько потоков и выделять ресурсы).

...