буферы протокола Python: десериализация сообщений без загрузки всего файла в память? - PullRequest
1 голос
/ 16 октября 2019

Я использую Google Protocol Buffers и Python для декодирования некоторых больших файлов данных - по 200 МБ каждый. У меня есть код ниже, который показывает, как декодировать поток с разделителями, и он работает просто отлично. Однако он использует команду read(), которая загружает весь файл в память и затем перебирает его.

import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32

with open('/home/working/data/feed.pb', 'rb') as f:
    buf = f.read() ## PROBLEM-LOADS ENTIRE FILE TO MEMORY.
    n = 0
    while n < len(buf):
        msg_len, new_pos = _DecodeVarint32(buf, n)
        n = new_pos
        msg_buf = buf[n:n+msg_len]
        n += msg_len
        read_row = sfeed.standard_feed()
        read_row.ParseFromString(msg_buf)
        # do something with read_metric
        print(read_row)

Обратите внимание, что этот код взят из другого SO сообщения, но я не помню точный URL. Мне было интересно, существует ли readlines() эквивалент с буферами протокола, который позволяет мне читать по одному сообщению с разделителями за раз и декодировать его? Я в основном хочу конвейер, который не ограничен оперативной памятью, я должен загрузить файл.

Похоже, что был пакет pystream-protobuf, который поддерживал некоторые из этих функций, но он не обновлялся в течение года или двух. Существует также сообщение от 7 лет назад, в котором задавался аналогичный вопрос. Но мне было интересно, появилась ли какая-либо новая информация с тех пор.

пример python для чтения нескольких сообщений protobuf из потока

1 Ответ

1 голос
/ 18 октября 2019

Если нормально загружать по одному полному сообщению за раз, это довольно просто реализовать, изменив отправленный вами код:

import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32

with open('/home/working/data/feed.pb', 'rb') as f:
    buf = f.read(10) # Maximum length of length prefix
    while buf:
        msg_len, new_pos = _DecodeVarint32(buf, n)
        buf = buf[new_pos:]
        # read rest of the message
        msg_buf += f.read(msg_len - len(buf))
        read_row = sfeed.standard_feed()
        read_row.ParseFromString(msg_buf)
        msg_buf = msg_buf[msg_len:]
        # do something with read_metric
        print(read_row)
        # read length prefix for next message
        buf += f.read(10 - len(buf))

Это читает 10 байтов, что достаточно для анализа длиныпрефикс, а затем читает остальную часть сообщения, как только его длина известна.

Строковые мутации не очень эффективны в Python (они делают много копий данных), поэтому использование bytearray может повысить производительностьесли ваши индивидуальные сообщения также большие.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...