У меня есть два процесса A и B. Они используют 2 очереди для связи с другим. Одна очередь запросов, где A пишет, а B читает из; и одна очередь ответа, где B записывает после получения запроса, а A читает ответ. Сообщения выглядят так: [messagelength] [message] в обоих направлениях.
Две очереди - это unix именованные каналы, созданные с помощью os.mkfifo. A создает обе очереди и записывает запрос (в этот момент запись зависает, пока B не откроет очередь для чтения, но это ожидается и работает достаточно хорошо):
request_queue: Path = '~/request.pipe'
response_queue: Path = '~/response.pipe'
os.mkfifo(request_queue.as_posix(), 0o700)
os.mkfifo(response_queue.as_posix(), 0o700)
request: bytes = binascii.hexlify(dill.dumps(...))
request_queue.write_bytes(struct.pack('@I', len(request))+request)
with response_queue.open('rb') as response_stream:
readers, _, _ = select([response_stream], [], [])
reader: io.BufferedReader = readers.pop()
# here's the interesting part. Remember, every message is guaranteed to have 4bytes of length information, followed by the actual response with the given length.
size: int = struct.unpack('@I', reader.read(4))[0] # Here, occasionally I get an unpack error, because the reader read less than 4 bytes of data.
message: Any = dill.dumps(binascii.unhexlify(reader.read(size))) # other times, I get an EOFError at this point because read also didn't read size amount of bytes.
Остальное не важно. Со стороны B, B пишет в очередь ответов через write_bytes, предоставленный Path, точно так же, как запрос был написан на конце A:
self.response_queue.write_bytes(struct.pack('@I', len(response))+response)
Я отлаживал и отлаживал эти ошибки распаковки и загружал ошибки в прошлом пару часов, но, похоже, что при отладке, шаг за шагом и через проблему нет, потому что чтение получит правильное количество байтов. Не повезло на этом фронте. Тем не менее, мне удалось заставить вещи работать с крошечной функцией чтения, которая на 100% читает необходимое количество байтов. Если первое чтение возвращает меньше, то оно пытается снова и снова. Вопрос, можно ли это сделать как-нибудь лучше?
def read_exactly_n_bytes(reader: io.BufferedReader, size: int):
buffer: bytes = b''
while len(buffer) < size:
buffer = buffer+reader.read(size-len(buffer))
return buffer
Не могу найти надежного способа ожидания ровно n байтов в именованном канале и затем прочитать эти n байтов.