Совместное использование asyncio.Queue с другим потоком или процессом - PullRequest
0 голосов
/ 25 января 2019

Я недавно преобразовал свою старую программу сопоставления шаблонов в asyncio, и у меня возникла ситуация, когда одна из моих сопрограмм использует метод блокировки (processing_frame).

Я хочу запускать этот метод в отдельном потоке или процессе всякий раз, когда сопрограмма , вызывающая этот метод (analyze_frame), получает элемент из общего asyncio.Queue()

Я не уверен, возможно ли это или стоит ли это с точки зрения производительности, поскольку у меня очень мало опыта в многопоточности и многопроцессорности

import cv2
import datetime
import argparse
import os
import asyncio

#   Making CLI
if not os.path.exists("frames"):
    os.makedirs("frames")

t0 = datetime.datetime.now()
ap = argparse.ArgumentParser()
ap.add_argument("-v", "--video", required=True,
                help="path to our file")
args = vars(ap.parse_args())

threshold = .2
death_count = 0
was_found = False
template = cv2.imread('youdied.png')
vidcap = cv2.VideoCapture(args["video"])

loop = asyncio.get_event_loop()
frames_to_analyze = asyncio.Queue()


def main():
    length = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
    tasks = []
    for _ in range(int(length / 50)):
        tasks.append(loop.create_task(read_frame(50, frames_to_analyze)))
        tasks.append(loop.create_task(analyze_frame(threshold, template, frames_to_analyze)))
    final_task = asyncio.gather(*tasks)
    loop.run_until_complete(final_task)

    dt = datetime.datetime.now() - t0
    print("App exiting, total time: {:,.2f} sec.".format(dt.total_seconds()))

    print(f"Deaths registered: {death_count}")


async def read_frame(frames, frames_to_analyze):
    global vidcap
    for _ in range(frames-1):
        vidcap.grab()

    else:
        current_frame = vidcap.read()[1]
    print("Read 50 frames")
    await frames_to_analyze.put(current_frame)


async def analyze_frame(threshold, template, frames_to_analyze):
    global vidcap
    global was_found
    global death_count
    frame = await frames_to_analyze.get()
    is_found = processing_frame(frame)
    if was_found and not is_found:
        death_count += 1
        await writing_to_file(death_count, frame)
    was_found = is_found


def processing_frame(frame):
    res = cv2.matchTemplate(frame, template, cv2.TM_CCOEFF_NORMED)
    max_val = cv2.minMaxLoc(res)[1]
    is_found = max_val >= threshold
    print(is_found)
    return is_found


async def writing_to_file(death_count, frame):
    cv2.imwrite(f"frames/frame{death_count}.jpg", frame)

if __name__ == '__main__':
    main()

Я пытался использовать unsync , но без особого успеха
Я бы получил что-то вроде

с self._rlock:
PermissionError: [WinError 5] Доступ запрещен

1 Ответ

0 голосов
/ 26 января 2019

Если processing_frame является функцией блокировки, вы должны вызывать ее с помощью await loop.run_in_executor(None, processing_frame, frame). Это передаст функцию в пул потоков и позволит циклу событий продолжить выполнение других действий до завершения функции вызова.

То же самое касается вызовов, таких как cv2.imwrite. Как написано, writing_to_file является , а не действительно асинхронным, несмотря на то, что определено с async def. Это потому, что он ничего не ждет, поэтому, как только его выполнение начнется, он продолжится до конца, даже не останавливаясь. В этом случае можно было бы также сделать его нормальной функцией, во-первых, чтобы было понятно, что происходит.

...