Битовая маска Python (переменной длины) - PullRequest
3 голосов
/ 07 февраля 2011

Чтобы решить одну исследовательскую задачу, мы должны организовать поиск битовых масок в python.В качестве входных данных у нас есть необработанные данные (мы представляем их как последовательность битов).Размер чего-то около 1,5 ГБ.в качестве вывода мы должны получить количество вхождений определенной битовой маски.Позвольте мне привести пример, чтобы описать ситуацию

input:    sequence of bits, a bitmask to search(mask length: 12bits)

. Первая идея (неэффективная) состоит в том, чтобы использовать XOR следующим образом:

1step: from input we take 12 first bits(position 0 to 11) and make XOR with mask 
2step: from input we take bits from 1 to 12 position and XOR with mask ...

Давайте продолжим два первых шага:

input sequence 100100011110101010110110011010100101010110101010
mask to search: 100100011110
step 1: take first 12 bits from input: 100100011110 and XOR it with mask.
step 2: teke bits from 1 to 12position: 001000111101 and XOR it with mask.
...

проблема в том, как организовать получение битов со входа?мы можем взять первые 12 битов, но как мы можем взять биты от 1 до 12 для тех позиций, которые нам нужны для следующей итерации?

Прежде чем использовать пакет Python BitString, но время, которое мы тратим на поиск всехМаска очень высокая.И еще один.Размер маски может быть от 12 бит до 256. Есть предложения?задача должна быть реализована в python

Ответы [ 3 ]

4 голосов
/ 07 февраля 2011

Ваш алгоритм - наивный способ поиска «строк» ​​в данных, но, к счастью, есть намного лучшие алгоритмы. Одним из примеров является алгоритм KMP , но есть и другие, которые могут лучше соответствовать вашему варианту использования.

С помощью лучшего алгоритма вы можете перейти от сложности O(n*m) к O(n+m).

2 голосов
/ 07 февраля 2011

Если ваша маска кратна 8 битам, ваш поиск становится сравнительно тривиальным сравнением байтов и будет достаточен любой алгоритм поиска подстроки (я бы не рекомендовал бы преобразовать в строку и используя встроенный поиск, так как вы, скорее всего, будете страдать от проблем с проверкой символов).

sequence = <list of 8-bit integers>
mask = [0b10010001, 0b01101101]
matches = my_substring_search(sequence, mask)

Для маски больше 8 бит, но не кратной восьми, я бы предложил обрезать маску до кратного 8 и использовать тот же поиск подстроки, что и выше. Затем для любых найденных совпадений вы можете проверить остаток.

sequence = <list of 8-bit integers>
mask_a = [0b10010001]
mask_b = 0b01100000
mask_b_pattern = 0b11110000   # relevant bits of mask_b
matches = my_substring_search(sequence, mask_a)

for match in matches:
    if (sequence[match+len(mask_a)] & mask_b_pattern) == mask_b:
        valid_match = True  # or something more useful...

Если sequence представляет собой список из 4096 байтов, вам может потребоваться учитывать перекрытие между разделами. Это можно легко сделать, составив sequence список 4096+ceil(mask_bits/8.0) байтов, но при этом каждый раз, когда вы читаете следующий блок, он увеличивается только на 4096.


В качестве демонстрации создания и использования этих масок:

class Mask(object):
    def __init__(self, source, source_mask):
        self._masks = list(self._generate_masks(source, source_mask))

    def match(self, buffer, i, j):
        return any(m.match(buffer, i, j) for m in self._masks)

    class MaskBits(object):
        def __init__(self, pre, pre_mask, match_bytes, post, post_mask):
            self.match_bytes = match_bytes
            self.pre, self.pre_mask = pre, pre_mask
            self.post, self.post_mask = post, post_mask

        def __repr__(self):
            return '(%02x %02x) (%s) (%02x %02x)' % (self.pre, self.pre_mask,
                ', '.join('%02x' % m for m in self.match_bytes),
                self.post, self.post_mask)

        def match(self, buffer, i, j):
            return (buffer[i:j] == self.match_bytes and
                    buffer[i-1] & self.pre_mask == self.pre and
                    buffer[j] & self.post_mask == self.post)

    def _generate_masks(self, src, src_mask):
        pre_mask = 0
        pre = 0
        post_mask = 0
        post = 0
        while pre_mask != 0xFF:
            src_bytes = []
            for i in (24, 16, 8, 0):
                if (src_mask >> i) & 0xFF == 0xFF:
                    src_bytes.append((src >> i) & 0xFF)
                else:
                    post_mask = (src_mask >> i) & 0xFF
                    post = (src >> i) & 0xFF
                    break
            yield self.MaskBits(pre, pre_mask, src_bytes, post, post_mask)
            pre += pre
            pre_mask += pre_mask
            if src & 0x80000000: pre |= 0x00000001
            pre_mask |= 0x00000001
            src = (src & 0x7FFFFFFF) * 2
            src_mask = (src_mask & 0x7FFFFFFF) * 2

Этот код не является полным алгоритмом поиска, он является частью проверочных совпадений. Объект Mask создается с исходным значением и исходной маской, выровненными по левому краю и (в этой реализации) длиной 32 бита:

src = 0b11101011011011010101001010100000
src_mask = 0b11111111111111111111111111100000

Буфер представляет собой список байтовых значений:

buffer_1 = [0x7e, 0xb6, 0xd5, 0x2b, 0x88]

Объект Mask генерирует внутренний список смещенных масок:

>> m = Mask(src, src_mask)
>> m._masks
[(00 00) (eb, 6d, 52) (a0 e0),
 (01 01) (d6, da, a5) (40 c0),
 (03 03) (ad, b5, 4a) (80 80),
 (07 07) (5b, 6a, 95) (00 00),
 (0e 0f) (b6, d5) (2a fe),
 (1d 1f) (6d, aa) (54 fc),
 (3a 3f) (db, 54) (a8 f8),
 (75 7f) (b6, a9) (50 f0)]

Средний элемент - это подстрока с точным соответствием (нет аккуратного способа получить это из этого объекта как есть, но это m._masks[i].match_bytes). После того, как вы использовали эффективный алгоритм для поиска этой подпоследовательности, вы можете проверить окружающие биты, используя m.match(buffer, i, j), где i - индекс первого совпадающего байта, а j - индекс байта после последнего совпадающего байта ( такой что buffer[i:j] == match_bytes).

В buffer выше, битовая последовательность может быть найдена начиная с бита 5, что означает, что _masks[4].match_bytes можно найти в buffer[1:3]. В результате:

>> m.match(buffer, 1, 3)
True

(Не стесняйтесь использовать, адаптировать, модифицировать, продавать или пытать этот код любым возможным способом. Мне очень понравилось собирать его - интересная проблема - хотя я не буду нести ответственность за любые ошибки, поэтому убедитесь, что вы проверить это тщательно!)

1 голос
/ 07 февраля 2011

Поиск битовых комбинаций в байтовых данных немного сложнее, чем при обычном поиске.Обычные алгоритмы не всегда работают хорошо, поскольку существует необходимость извлечения каждого бита из байтовых данных, и существует только «алфавит» из двух символов, так что случайно совпадет 50% сравнений (это делает многие алгоритмы оченьменее эффективно).

Вы упоминали, что пытались использовать модуль цепочки битов (который я написал), но он был слишком медленным.Это не слишком удивительно для меня, поэтому, если у кого-то есть какие-то отличные идеи о том, как это сделать, я обращаю внимание!Но способ, используемый для цепочки битов, предполагает возможное ускорение:

Для сопоставления цепочек битов преобразуются куски байтовых данных в обычные строки '0 и' 1, а затем используется метод Python findсделать быстрый поиск.Много времени тратится на преобразование данных в строку, но поскольку вы выполняете поиск в одних и тех же данных несколько раз, возникает большая экономия.

masks = ['0000101010100101', '010100011110110101101', '01010101101']
byte_data_chunk = bytearray('blahblahblah')
# convert to a string with one character per bit
# uses lookup table not given here!
s = ''.join(BYTE_TO_BITS[x] for x in byte_data_chunk)
for mask in masks:
    p = s.find(mask)
    # etc.

Дело в том, что после преобразования вОбычную строку вы можете использовать встроенную find, которая очень хорошо оптимизирована, для поиска каждой из ваших масок.Когда вы использовали цепочку битов, она должна была выполнять преобразование для каждой маски, которая могла бы снизить производительность.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...