Распыление среднего потока (питон) - PullRequest
2 голосов
/ 18 января 2012

Я пишу сценарии для обработки (очень больших) файлов путем многократного расщепления объектов до EOF.Я хотел бы разделить файл и иметь отдельные процессы (в облаке), обрабатывать и обрабатывать отдельные части.

Однако мой разделитель не интеллектуален, он не знает о границах между засоленными объектами в файле (так как эти границы зависят от типов пересекаемых объектов и т.способ сканировать файл на предмет «запуска протравленного объекта»?Наивным способом было бы попытаться выполнить отбор при последовательных смещениях байтов до тех пор, пока объект не будет успешно отфильтрован, но это приводит к неожиданным ошибкам.Похоже, что для определенных комбинаций ввода, средство выбора не синхронизируется и ничего не возвращает для остальной части файла (см. Код ниже).

import cPickle
import os

def stream_unpickle(file_obj):
    while True:
        start_pos = file_obj.tell()
        try:
            yield cPickle.load(file_obj)
        except (EOFError, KeyboardInterrupt):
            break
        except (cPickle.UnpicklingError, ValueError, KeyError, TypeError, ImportError):
            file_obj.seek(start_pos+1, os.SEEK_SET)

if __name__ == '__main__':
    import random
    from StringIO import StringIO

    # create some data
    sio = StringIO()
    [cPickle.dump(random.random(), sio, cPickle.HIGHEST_PROTOCOL) for _ in xrange(1000)]
    sio.flush()

    # read from subsequent offsets and find discontinuous jumps in object count
    size = sio.tell()
    last_count = None
    for step in xrange(size):
        sio.seek(step, os.SEEK_SET)
        count = sum(1 for _ in stream_unpickle(file_obj))
        if last_count is None or count == last_count - 1:
            last_count = count
        elif count != last_count:
            # if successful, these should never print (but they do...)
            print '%d elements read from byte %d' % (count, step)
            print '(%d elements read from byte %d)' % (last_count, step-1)
            last_count = count

Ответы [ 3 ]

1 голос
/ 18 января 2012

Модуль pickletools имеет функцию dis , которая показывает коды операций. Он показывает, что существует код операции STOP, который вы можете сканировать:

>>> import pickle, pickletools, StringIO
>>> s = StringIO.StringIO()
>>> pickle.dump('abc', s)
>>> p = s.getvalue()
>>> pickletools.dis(p)
    0: S    STRING     'abc'
    7: p    PUT        0
   10: .    STOP
highest protocol among opcodes = 0

Обратите внимание, что использование кода операции STOP немного сложнее, потому что коды имеют переменную длину, но это может послужить полезной подсказкой о том, где находятся отсечки.

Если вы контролируете шаг травления на другом конце, то вы можете улучшить ситуацию, добавив свой собственный однозначный альтернативный разделитель:

>>> sep = '\xDE\xAD\xBE\xEF'
>>> s = StringIO.StringIO()
>>> pickle.dump('abc', s)
>>> s.write(sep)
>>> pickle.dump([10, 20], s)
>>> s.write(sep)
>>> pickle.dump('def', s)
>>> s.write(sep)
>>> pickle.dump([30, 40], s)
>>> p = s.getvalue()

Перед распаковкой разделить на отдельные соленья с помощью известного сепаратора:

>>> for pick in p.split(sep):
        print pickle.loads(pick)

abc
[10, 20]
def
[30, 40]
0 голосов
/ 20 января 2012

Извините, что отвечаю на мой вопрос, и спасибо @RaymondHettinger за идею добавления часовых.

Вот что сработало для меня.Я создал читателей и писателей, которые используют часовой '#S', за которым следует длина блока данных в начале каждой «записи».Автор должен позаботиться о том, чтобы найти любые вхождения '#' в записываемых данных и удвоить их (до '##').Затем читатель использует регулярное выражение для поиска часовых структур, отличных от любых совпадающих значений, которые могут быть в исходном потоке, а также проверяет число байтов между этим часовым и последующим.менеджер контекста (так что несколько вызовов write () могут быть инкапсулированы в одну запись при необходимости).RecordReader является генератором.

Не уверен, как это на производительность.Любые более быстрые / элегантные решения приветствуются.

import re
import cPickle
from functools import partial
from cStringIO import StringIO

SENTINEL = '#S'

# when scanning look for #S, but NOT ##S
sentinel_pattern = '(?<!#)#S' # uses negative look-behind
sentinel_re = re.compile(sentinel_pattern)
find_sentinel = sentinel_re.search

# when writing replace single # with double ##
write_pattern = '#'
write_re = re.compile(write_pattern)
fix_write = partial(write_re.sub, '##')

# when reading, replace double ## with single #
read_pattern = '##'
read_re = re.compile(read_pattern)
fix_read = partial(read_re.sub, '#') 

class RecordWriter(object):
    def __init__(self, stream):
        self._stream = stream
        self._write_buffer = None

    def __enter__(self):
        self._write_buffer = StringIO()
        return self

    def __exit__(self, et, ex, tb):
        if self._write_buffer.tell():
            self._stream.write(SENTINEL) # start
            cPickle.dump(self._write_buffer.tell(), self._stream, cPickle.HIGHEST_PROTOCOL) # byte length of user's original data
            self._stream.write(fix_write(self._write_buffer.getvalue()))
            self._write_buffer = None
        return False

    def write(self, data):
        if not self._write_buffer:
            raise ValueError("Must use StreamWriter as a context manager")
        self._write_buffer.write(data)

class BadBlock(Exception): pass

def verify_length(block):
    fobj = StringIO(block)
    try:
        stated_length = cPickle.load(fobj)
    except (ValueError, IndexError, cPickle.UnpicklingError):
        raise BadBlock
    data = fobj.read()
    if len(data) != stated_length:
        raise BadBlock
    return data

def RecordReader(stream):
    ' Read one record '
    accum = StringIO()
    seen_sentinel = False
    data = ''
    while True:
        m = find_sentinel(data)
        if not m:
            if seen_sentinel:
                accum.write(data)
            data = stream.read(80)
            if not data:
                if accum.tell():
                    try: yield verify_length(fix_read(accum.getvalue()))
                    except BadBlock: pass
                return
        else:
            if seen_sentinel:
                accum.write(data[:m.start()])
                try: yield verify_length(fix_read(accum.getvalue()))
                except BadBlock: pass
                accum = StringIO()
            else:
                seen_sentinel = True
            data = data[m.end():] # toss

if __name__ == '__main__':
    import random

    stream = StringIO()
    data = [str(random.random()) for _ in xrange(3)]
    # test with a string containing sentinel and almost-sentinel
    data.append('abc12#jeoht38#SoSooihetS#')
    count = len(data)
    for i in data:
        with RecordWriter(stream) as r:
            r.write(i)

    size = stream.tell()
    start_pos = random.random() * size
    stream.seek(start_pos, os.SEEK_SET)
    read_data = [s for s in RecordReader(stream)]
    print 'Original data: ', data
    print 'After seeking to %d, RecordReader returned: %s' % (start_pos, read_data)
0 голосов
/ 18 января 2012

В выбранном файле некоторые коды операций имеют аргумент - значение данных, которое следует за кодом операции.Значения данных различаются по длине и могут содержать байты, идентичные кодам операций.Поэтому, если вы начинаете читать файл с произвольной позиции, у вас нет возможности узнать, смотрите ли вы на код операции или на середину аргумента.Вы должны прочитать файл с начала и проанализировать коды операций.

Я подготовил эту функцию, которая пропускает один файл в файле, т.е. читает его и анализирует коды операций, но не создает объекты.Кажется, немного быстрее, чем cPickle.loads на некоторых файлах, которые у меня есть.Вы могли бы переписать это в C для большей скорости.(после правильной проверки)

Затем вы можете сделать один проход по всему файлу, чтобы получить позицию поиска для каждого маринада.

from pickletools import code2op, UP_TO_NEWLINE, TAKEN_FROM_ARGUMENT1, TAKEN_FROM_ARGUMENT4   
from marshal import loads as mloads

def skip_pickle(f):
    """Skip one pickle from file.

    'f' is a file-like object containing the pickle.

    """
    while True:
        code = f.read(1)
        if not code:
            raise EOFError
        opcode = code2op[code]
        if opcode.arg is not None:
            n = opcode.arg.n
            if n > 0:
                f.read(n)
            elif n == UP_TO_NEWLINE:
                f.readline()
            elif n == TAKEN_FROM_ARGUMENT1:
                n = ord(f.read(1))
                f.read(n)
            elif n == TAKEN_FROM_ARGUMENT4:
                n = mloads('i' + f.read(4))
                f.read(n)
        if code == '.':
            break        
...