Python - объединение многопроцессорности и асинхронности - PullRequest
0 голосов
/ 03 июля 2018

Я пытаюсь совместить многопроцессорность с asyncio. Программа имеет два основных компонента - один для потоковой передачи / генерирования контента, а другой для его потребления.

То, что я хочу сделать, - это создать несколько процессов для использования нескольких ядер ЦП - одно для потокового прослушивателя / генератора, другое для потребителя и простое, чтобы отключить все, когда потребитель остановился.

До сих пор я подходил к созданию процессов и их запуску. Каждый такой процесс создает асинхронную задачу. После запуска всех процессов я запускаю асинхронные задачи. То, что я до сих пор (урезанный) это:

def consume_task(loop, consumer):
    loop.create_task(consume_queue(consumer))

def stream_task(loop, listener, consumer):
    loop.create_task(create_stream(listener, consumer))

def shutdown_task(loop, listener):
    loop.create_task(shutdown(consumer))

async def shutdown(consumer):
    print("Shutdown task created")
    while not consumer.is_stopped():
        print("No activity")
        await asyncio.sleep(5)
    print("Shutdown initiated")
    loop.stop()

async def create_stream(listener, consumer):
    stream = Stream(auth, listener)
    print("Stream created")
    stream.filter(track=KEYWORDS, is_async=True)
    await asyncio.sleep(EVENT_DURATION)
    print("Stream finished")
    consumer.stop()

async def consume_queue(consumer):
    await consumer.run()

loop = asyncio.get_event_loop()

p_stream = Process(target=stream_task, args=(loop, listener, consumer, ))
p_consumer = Process(target=consume_task, args=(loop, consumer, ))
p_shutdown = Process(target=shutdown_task, args=(loop, consumer, ))
p_stream.start()
p_consumer.start()
p_shutdown.start()

loop.run_forever()
loop.close()

Проблема в том, что все зависает (или оно блокируется?) - никакие задачи на самом деле не выполняются. Моим решением было изменить первые три функции на:

def consume_task(loop, consumer):
    loop.create_task(consume_queue(consumer))
    loop.run_forever()

def stream_task(loop, listener, consumer):
    loop.create_task(create_stream(listener, consumer))
    loop.run_forever()

def shutdown_task(loop, listener):
    loop.create_task(shutdown(consumer))
    loop.run_forever()

Это действительно работает. Однако объекты consumer и listener не могут взаимодействовать. В качестве простого примера, когда функция create_stream вызывает consumer.stop(), потребитель не останавливается. Даже когда я изменяю переменную класса consumer, изменения не вносятся - в данном случае общая очередь остается пустой. Вот как я создаю экземпляры:

queue = Queue()
consumer = PrintConsumer(queue)
listener = QueuedListener(queue, max_time=EVENT_DURATION)

Обратите внимание, что, если я не использую процессы, а только асинхронные задачи, все работает как положено, поэтому я не думаю, что это справочная проблема:

loop = asyncio.get_event_loop()
stream_task(loop, listener, consumer)
consume_task(loop, consumer)
shutdown_task(loop, listener)
loop.run_forever()
loop.close()

Это потому, что они работают в разных процессах? Как мне решить проблему, пожалуйста?

1 Ответ

0 голосов
/ 03 июля 2018

Нашел проблему! Мультиобработка создает копии экземпляров. Решение состоит в том, чтобы создать Manager , который совместно использует экземпляры.

...