Цель: Чтобы безопасно отменить отложенные задачи во всех потоках.Затем благополучно завершите все циклы asyncio во всех потоках.
Объяснение кода: Я открыл два потока, один для запуска сервера, а другой для фоновой обработки.Каждый поток имеет свои отдельные циклы asyncio.
Желаемая функция: Когда я получаю от клиента сообщение с именем onClose
, я хочу немедленно безопасно завершить все процессы во всех потоках.Требуемая функция находится в функции func_websocket_connection()
после print('Running On Close Function')
Методы работы: Конечно, я пытался os._exit(0)
внезапно остановить все.Он выполняет то, что я хочу, но я также знаю, что это небезопасно и может повредить обработку данных.Я также пытался
print('Running On Close Function')
loop = asyncio.get_event_loop()
loop = loop.stop()
, который также работает, но я получаю Task was destroyed but it is pending!
Несерверный код:
import asyncio
import websockets
from threading import Thread
from time import sleep
#=============================================
# Open Websocket Server:
def func_run_server():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.ensure_future(func_websocket_connection())
loop.run_forever()
# Background Processing Function 1:
async def func_websocket_connection():
while i in range(100):
await asyncio.sleep(0.5)
print('Run Step Server 0')
#Some If Statement
if i == 10:
print('Running On Close Function')
#=============================================
# Open Background Processing:
def func_run_background():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
future_run_step0 = asyncio.ensure_future(func_run_step0())
future_run_step1 = asyncio.ensure_future(func_run_step1())
loop.run_until_complete(future_run_step0)
loop.run_until_complete(future_run_step1)
# Background Processing Function 1:
async def func_run_step0():
await asyncio.sleep(5.0)
print('Run Step 0')
# Background Processing Function 2:
async def func_run_step1():
await asyncio.sleep(5.0)
print('Run Step 1')
#================================================================================
#Running two separate threads
Thread(target=func_run_server).start()
Thread(target=func_run_background).start()