Я пытаюсь реализовать стек http / 2 на своем собственном сервере приложений, который я создал с нуля, используя asyncio.Насколько я понимаю, asyncio поддерживает внутреннюю очередь «задач», которая используется циклом событий для запуска задач.Теперь, чтобы реализовать приоритезацию потоков, мне нужно иметь возможность запускать задачи с высоким приоритетом в течение более длительного периода времени, чем задачи с низким приоритетом (в силу задач, о которых я думаю, сопрограмма, возвращаемая при вызове приложения (scope, receive, send) каксогласно спецификации ASGI .) Я не могу найти способ расставить приоритеты для этой внутренней очереди, используемой asyncio.
Я даже думал о захвате диктов событий, которые я получаю какаргумент для отправки, вызываемой в приложении (scope, receive, send), но спецификация asgi говорит: « Серверы протоколов должны сбрасывать любые данные, передаваемые им, в буфер отправки перед возвратом из вызова send ».Что подразумевается под «буфером отправки» здесь?Это буфер отправки ОС / ядра?
Думал ли я о приоритезации потоков в неправильном смысле?Что было бы хорошим подходом для реализации этого?
class Worker(object):
def get_asgi_event_dict(self, frame):
event_dict = {
"type": "http",
"asgi": {"version": "2.0", "spec_version": "2.1"},
"http_version": "2",
"method": frame.get_method(),
"scheme": "https",
"path": frame.get_path(),
"query_string": "",
"headers": [
[k.encode("utf-8"), v.encode("utf-8")] for k, v in frame.headers.items()
],
}
return event_dict
async def handle_request(self):
try:
while True:
self.request_data = self.client_connection.recv(4096)
self.frame = self.parse_request(self.request_data)
if isinstance(self.frame, HeadersFrame):
if self.frame.end_stream:
current_stream = Stream(
self.connection_settings,
self.header_encoder,
self.header_decoder,
self.client_connection,
)
current_stream.stream_id = self.frame.stream_id
asgi_scope = self.get_asgi_event_dict(self.frame)
current_stream.asgi_app = self.application(asgi_scope)
# The next line puts the coroutine obtained from a call to
# current_stream.asgi_app on the "tasks" queue and hence
# out of my control to prioritize.
await current_stream.asgi_app(
self.trigger_asgi_application, current_stream.send_response
)
else:
self.asgi_scope = self.get_asgi_event_dict(self.frame)
except Exception:
print("Error occurred in handle_request")
print((traceback.format_exc()))
class Stream(object):
async def send_response(self, event):
# converting the event dict into http2 frames and sending them to the client.