Как использовать python-trio с буфером протокола Google? - PullRequest
3 голосов
/ 14 июня 2019

Я пытаюсь прочитать некоторые потоки данных, используя protobuf в Python, и я хочу использовать трио, чтобы сделать клиент для чтения потоков.Protobuf имеет некоторые вызовы методов, и я считаю, что они не работают, когда я использую потоки трио.

Клиент Python на компьютере с Linux.

import DTCProtocol_pb2 as Dtc

async def parent(addr, encoding, heartbeat_interval):
    print(f"parent: connecting to 127.0.0.1:{addr[1]}")
    client_stream = await trio.open_tcp_stream(addr[0], addr[1])

    # encoding request
    print("parent: spawing encoding request ...")
    enc_req = create_enc_req(encoding) # construct encoding request
    await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request

    log.debug('get_reponse: started')
    response = await client_stream.receive_some(1024)
    m_size = struct.unpack_from('<H', response[:2]) # the size of message
    m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
    m_body = response[4:]
    m_resp = Dtc.EncodingResponse()

m_body будут некоторые байтовые данные, который я не знаю, как декодировать.Dtc.EncodingResponse() - это метод protobuf, который дает объект Dtc, который содержит ответ в читаемом формате.(Dtc - это файл protobuf).Но я ничего не понимаю здесь.Когда я делал этот сценарий без трио, Dtc.EncodingResponse() давал полный ответ в читаемом формате.

Я предполагаю, что проблема в том, что "client_stream" - это объект потока трио, который читает только байты, и поэтому мне, вероятно, нужно вместо этого использовать объект ReceiveChannel.Но если это правда, я не знаю, как это сделать.

ОБНОВЛЕНИЕ: ответ ниже Натаниэля Дж. Смита решает мою проблему.

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

Я чувствую себя так глупо, но яне обрабатывал данные ранее, и это было все, что потребовалось.Чрезвычайно благодарен всем, кто дал ответы.Надеюсь, это поможет кому-то там.

1 Ответ

1 голос
/ 15 июня 2019

Как сказал @shmee в комментарии, я думаю, что ваш код несколько искажен из-за правок ... вы должны перепроверить.

Когда я делал этот сценарий без трио, Dtc.EncodingResponse() давал бы полный ответ в читаемом формате

Я думаю, что вы, возможно, бросили черту при переходе на Трио? Dtc.EncodingResponse() просто создает новый пустой EncodingResponse объект. Если вы хотите проанализировать данные из m_body в ваш новый объект, вы должны сделать это явно, с чем-то вроде:

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

Однако есть еще одна проблема ... причина, по которой она называется receive_some, заключается в том, что она получает некоторые байтов, но может не получить всех байтов, которые вы запрашивали. Ваш код предполагает, что один вызов receive_some извлечет все байты в ответе, и это может быть правдой, когда вы выполняете простой тест, но в целом это не гарантируется. Если вы не получаете достаточно данных при первом вызове receive_some, возможно, вам придется повторять вызов до тех пор, пока вы не получите все данные.

Это на самом деле очень стандартно ... сокеты работают так же. Поэтому первым делом ваш сервер отправляет поле m_size в начале - это так, что вы можете определить, получили ли вы все данные или нет!

К сожалению, по состоянию на июнь 2019 года Trio не предоставляет помощника для выполнения этого цикла - вы можете отслеживать прогресс в этом в этой проблеме . А пока можно написать свой. Я думаю, что-то вроде этого должно работать:

async def receive_exactly(stream, count):
    buf = bytearray()
    while len(buf) < count:
        new_data = await stream.receive_some(count - len(buf))
        if not new_data:
            raise RuntimeError("other side closed the connection unexpectedly")
        buf += new data
    return buf

async def receive_encoding_response(stream):
    header = await receive_exactly(stream, 4)
    (m_size, m_type) = struct.unpack('<HH', header)
    m_body = await receive_exactly(stream, m_size)
    m_resp = Dtc.EncodingResponse()
    m_resp.ParseFromString(m_size)
    return m_resp
...