Читая ровно N байтов из именованного канала, подождите, если это число станет доступным - PullRequest
1 голос
/ 23 февраля 2020

У меня есть два процесса A и B. Они используют 2 очереди для связи с другим. Одна очередь запросов, где A пишет, а B читает из; и одна очередь ответа, где B записывает после получения запроса, а A читает ответ. Сообщения выглядят так: [messagelength] [message] в обоих направлениях.

Две очереди - это unix именованные каналы, созданные с помощью os.mkfifo. A создает обе очереди и записывает запрос (в этот момент запись зависает, пока B не откроет очередь для чтения, но это ожидается и работает достаточно хорошо):

request_queue: Path = '~/request.pipe'
response_queue: Path = '~/response.pipe'
os.mkfifo(request_queue.as_posix(), 0o700)
os.mkfifo(response_queue.as_posix(), 0o700)

request: bytes = binascii.hexlify(dill.dumps(...))
request_queue.write_bytes(struct.pack('@I', len(request))+request)
with response_queue.open('rb') as response_stream:
  readers, _, _ = select([response_stream], [], [])
  reader: io.BufferedReader = readers.pop()
  # here's the interesting part. Remember, every message is guaranteed to have 4bytes of length information, followed by the actual response with the given length.
  size: int = struct.unpack('@I', reader.read(4))[0]  # Here, occasionally I get an unpack error, because the reader read less than 4 bytes of data.
  message: Any = dill.dumps(binascii.unhexlify(reader.read(size)))  # other times, I get an EOFError at this point because read also didn't read size amount of bytes.

Остальное не важно. Со стороны B, B пишет в очередь ответов через write_bytes, предоставленный Path, точно так же, как запрос был написан на конце A:

self.response_queue.write_bytes(struct.pack('@I', len(response))+response)

Я отлаживал и отлаживал эти ошибки распаковки и загружал ошибки в прошлом пару часов, но, похоже, что при отладке, шаг за шагом и через проблему нет, потому что чтение получит правильное количество байтов. Не повезло на этом фронте. Тем не менее, мне удалось заставить вещи работать с крошечной функцией чтения, которая на 100% читает необходимое количество байтов. Если первое чтение возвращает меньше, то оно пытается снова и снова. Вопрос, можно ли это сделать как-нибудь лучше?

def read_exactly_n_bytes(reader: io.BufferedReader, size: int):
  buffer: bytes = b''
  while len(buffer) < size:
    buffer = buffer+reader.read(size-len(buffer))
  return buffer

Не могу найти надежного способа ожидания ровно n байтов в именованном канале и затем прочитать эти n байтов.

...