Я пытаюсь совместить многопроцессорность с asyncio. Программа имеет два основных компонента - один для потоковой передачи / генерирования контента, а другой для его потребления.
То, что я хочу сделать, - это создать несколько процессов для использования нескольких ядер ЦП - одно для потокового прослушивателя / генератора, другое для потребителя и простое, чтобы отключить все, когда потребитель остановился.
До сих пор я подходил к созданию процессов и их запуску. Каждый такой процесс создает асинхронную задачу. После запуска всех процессов я запускаю асинхронные задачи. То, что я до сих пор (урезанный) это:
def consume_task(loop, consumer):
loop.create_task(consume_queue(consumer))
def stream_task(loop, listener, consumer):
loop.create_task(create_stream(listener, consumer))
def shutdown_task(loop, listener):
loop.create_task(shutdown(consumer))
async def shutdown(consumer):
print("Shutdown task created")
while not consumer.is_stopped():
print("No activity")
await asyncio.sleep(5)
print("Shutdown initiated")
loop.stop()
async def create_stream(listener, consumer):
stream = Stream(auth, listener)
print("Stream created")
stream.filter(track=KEYWORDS, is_async=True)
await asyncio.sleep(EVENT_DURATION)
print("Stream finished")
consumer.stop()
async def consume_queue(consumer):
await consumer.run()
loop = asyncio.get_event_loop()
p_stream = Process(target=stream_task, args=(loop, listener, consumer, ))
p_consumer = Process(target=consume_task, args=(loop, consumer, ))
p_shutdown = Process(target=shutdown_task, args=(loop, consumer, ))
p_stream.start()
p_consumer.start()
p_shutdown.start()
loop.run_forever()
loop.close()
Проблема в том, что все зависает (или оно блокируется?) - никакие задачи на самом деле не выполняются. Моим решением было изменить первые три функции на:
def consume_task(loop, consumer):
loop.create_task(consume_queue(consumer))
loop.run_forever()
def stream_task(loop, listener, consumer):
loop.create_task(create_stream(listener, consumer))
loop.run_forever()
def shutdown_task(loop, listener):
loop.create_task(shutdown(consumer))
loop.run_forever()
Это действительно работает. Однако объекты consumer
и listener
не могут взаимодействовать. В качестве простого примера, когда функция create_stream
вызывает consumer.stop()
, потребитель не останавливается. Даже когда я изменяю переменную класса consumer
, изменения не вносятся - в данном случае общая очередь остается пустой. Вот как я создаю экземпляры:
queue = Queue()
consumer = PrintConsumer(queue)
listener = QueuedListener(queue, max_time=EVENT_DURATION)
Обратите внимание, что, если я не использую процессы, а только асинхронные задачи, все работает как положено, поэтому я не думаю, что это справочная проблема:
loop = asyncio.get_event_loop()
stream_task(loop, listener, consumer)
consume_task(loop, consumer)
shutdown_task(loop, listener)
loop.run_forever()
loop.close()
Это потому, что они работают в разных процессах? Как мне решить проблему, пожалуйста?