Полностью потоковый анализатор XML - PullRequest
0 голосов
/ 25 октября 2018

Я пытаюсь использовать веб-сервис Exchange GetAttachment , используя запросы , lxml и base64io .Эта служба возвращает файл в кодировке base64 в ответе HTTP SOAP XML.Содержимое файла содержится в одной строке в одном элементе XML.GetAttachment является лишь примером, но проблема носит более общий характер.

Я бы хотел транслировать содержимое декодированного файла непосредственно на диск, не сохраняя все содержимое вложения в памяти в любой точке, так каквложение может быть несколько 100 МБ.

Я пробовал что-то вроде этого:

r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
with open('foo.txt', 'wb') as f:
    for action, elem in lxml.etree.iterparse(GzipFile(fileobj=r.raw)):
    if elem.tag == 't:Content':
        b64_encoder = Base64IO(BytesIO(elem.text))
        f.write(b64_encoder.read())

, но lxml по-прежнему сохраняет копию вложения как elem.text.Можно ли как-нибудь создать полностью потоковый парсер XML, который также передает содержимое элемента непосредственно из входного потока?

1 Ответ

0 голосов
/ 29 октября 2018

Не используйте iterparse в этом случае.Метод iterparse() может выдавать только начальные и конечные события элемента, поэтому любой текст в элементе передается вам, когда найден закрывающий тег XML.

Вместо этого используйте Интерфейс SAX-парсера .Это общий стандарт для библиотек синтаксического анализа XML для передачи проанализированных данных обработчику содержимого.В ContentHandler.characters() callback передаются символьные данные в виде кусков (при условии, что реализующая библиотека XML фактически использует эту возможность).Это API более низкого уровня от ElementTree API, и стандартная библиотека Python уже связывает парсер Expat для его управления.

Таким образом, поток становится:

  • обернуть входящийпоток запросов в GzipFile для легкой декомпрессии.Или, что еще лучше, установите response.raw.decode_content = True и оставьте распаковку для библиотеки запросов на основе кодировки содержимого, установленной сервером.
  • Передайте экземпляр GzipFile или необработанный поток в .parse()метод анализатора, созданного с помощью xml.sax.make_parser().Затем анализатор переходит к чтению из потока кусками.Используя make_parser(), вы вначале можете включить такие функции, как обработка пространства имен (что гарантирует, что ваш код не прервется, если Exchange решит изменить короткие префиксы, используемые для каждого пространства имен).
  • Метод обработчика содержимого characters() методвызывается с кусками данных XML;проверьте правильность события начала элемента, чтобы вы знали, когда ожидать данные base64.Вы можете декодировать эти данные base64 в виде кусков (кратных) 4 символов за раз и записывать их в файл.Я бы не стал использовать base64io здесь, просто делил бы свои фрагменты.

Простой обработчик контента мог бы быть:

from xml.sax import handler
from base64 import b64decode

class AttachmentContentHandler(handler.ContentHandler):
    types_ns = 'http://schemas.microsoft.com/exchange/services/2006/types'

    def __init__(self, filename):
        self.filename = filename

    def startDocument(self):
        self._buffer = None
        self._file = None

    def startElementNS(self, name, *args):
        if name == (self.types_ns, 'Content'):
            # we can expect base64 data next
            self._file = open(self.filename, 'wb')
            self._buffer = []

    def endElementNS(self, name, *args):
        if name == (self.types_ns, 'Content'):
            # all attachment data received, close the file
            try:
                if self._buffer:
                    raise ValueError("Incomplete Base64 data")
            finally:
                self._file.close()
                self._file = self._buffer = None

    def characters(self, data):
        if self._buffer is None:
            return
        self._buffer.append(data)
        self._decode_buffer()

    def _decode_buffer(self):
        remainder = ''
        for data in self._buffer:
            available = len(remainder) + len(data)
            overflow = available % 4
            if remainder:
                data = (remainder + data)
                remainder = ''
            if overflow:
                remainder, data = data[-overflow:], data[:-overflow]
            if data:
                self._file.write(b64decode(data))
        self._buffer = [remainder] if remainder else []

, и вы бы использовали его так:

import requests
from xml.sax import make_parser, handler

parser = make_parser()
parser.setFeature(handler.feature_namespaces, True)
parser.setContentHandler(AttachmentContentHandler('foo.txt'))

r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
r.raw.decode_content = True  # if content-encoding is used, decompress as we read
parser.parse(r.raw)

Это позволит проанализировать входной XML в виде кусков размером до 64 КБ (по умолчанию IncrementalParser размер буфера ), поэтому данные вложения декодируются не более чем в 48 КБ блоков необработанных данных..

Я бы, вероятно, расширил обработчик содержимого, чтобы взять целевой каталог, а затем искать элементы <t:Name> для извлечения имени файла, а затем использовать его для извлечения данных в правильное имя файла для каждого найденного вложения.Вы также хотели бы убедиться, что вы имеете дело с GetAttachmentResponse документом, и обработать ответы об ошибках.

...