В моем сценарии мне нужно разрешить произвольный доступ к отдельным элементам, сериализованным в msgpack. То есть учитывая двоичный файл и индекс элемента, я хочу перейти именно к этой позиции в файле и десериализовать этот элемент.
Чтобы получить смещение байта для каждого элемента, я использую функцию unpack
mgspack.Unpacker
. В mgspack 0.5 unpack
принимает необязательный аргумент write_bytes
, который является ловушкой, которая вызывается для строки необработанных данных перед сериализацией. Подсчет len
этой строки дает мне размер элемента в байтах, что позволяет мне накапливать смещение в байтах.
Начиная с msgpack 0.6, аргумент write_bytes
больше не принимается, и я не нашел никакой замены, которая дает мне строку ввода raw или количество использованных байтов после чтения элемента.
Вот функция, которую я использую для создания индекса. Функция возвращает индекс в виде списка смещений байтов. Каждая запись index[i]
содержит смещение байта до элемента i
. Критическая часть - это вызов unpacker.unpack(write_bytes=hook)
, который больше не принимает никаких атрибутов.
def index_from_recording(filename):
# create empty index
index = []
# hook that keeps track of the byte offset of the `msgpack.Unpacker`
hook = ByteOffsetHook()
with open(filename, "rb") as f:
# create the `msgpack.Unpacker`
unpacker = msgpack.Unpacker(f)
try:
while True:
# add current offset to index
index.append(hook.offset)
# unpack (and discard) next item.
# The `hook` keeps track of the read bytes
unpacker.unpack(write_bytes=hook) # <== `write_bytes` not accepted since 0.6
except msgpack.OutOfData:
pass
return index
ByteOffsetHook
определяется следующим образом. Хук просто считает len
необработанной входной строки и накапливает ее.
class ByteOffsetHook(object):
def __init__(self):
self.offset = 0
def __call__(self, data):
self.offset += len(data)
Для отладки вы можете использовать эту функцию для создания фиктивной записи.
def serialize_dummy_recording(filename):
with open(filename, "wb") as f:
for serialized_sample in [msgpack.packb({'x': i}) for i in range(10)]:
f.write(serialized_sample)
def main():
filename = "test.rec"
if not os.path.exists(filename):
serialize_dummy_recording(filename)
index = index_from_recording(filename)
print(index)
if __name__ == "__main__":
main()