Как получить construct.GreedyRange, чтобы вернуть байт? - PullRequest
0 голосов
/ 08 ноября 2018

Хорошо, предположим, у меня это работает точно так, как ожидалось:

from enum import IntEnum
from contstruct import *

class Char(IntEnum):
    START = 0xAB
    STOP = 0xBC
    ESC = 0xCD

MAPPING = Mapping(Byte, {x: x+1 for x in Char})

SLIP = GreedyRange(
    Select(
        FocusedSeq(
            'x',
            Const(Char.ESC, Byte), 
            Renamed(MAPPING, 'x')
        ),
        Byte
    )
)

Пример:

>>> buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
>>> SLIP.build(buffer)
b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff’

И:

>>> from operator import eq
>>> all(map(eq, SLIP.parse(SLIP.build(buffer)), buffer))
True

Теперь мне нужно обернутькодирование / декодирование внутри другой структуры:

PROTOCOL = FocusedSeq(
    'message',
    Const(Char.START, Byte),
    Renamed(SLIP, 'message'),
    Const(Char.STOP, Byte)
)

Сборка работает точно так же, как и ожидалось:

>>> PROTOCOL.build(buffer)
b'\xab\x00\xcd\xac\xcd\xbd\xcd\xce\xff\xbc'

Однако при разборе GreedyRange расходуется 1 слишком много байтов:

>>> PROTOCOL.parse(b'\xab\x00\xcd\xac\xcd\xbd\xcd\xce\xff\xbc')
construct.core.StreamError: stream read less than specified amount, expected 1, found 0

Как мне заставить GreedyRange вернуть байт?

Ответы [ 3 ]

0 голосов
/ 20 ноября 2018

Решением этого является NullTerminated(..., term=STOP), который внутренне буферизует базовый поток и возвращает при необходимости.

PROTOCOL = FocusedSeq(
    'message'
    Const(Char.START, Byte),
    message=NullTerminated(
        # NOTE build consumes entire stream and appends STOP
        # NOTE parse consumes steam until STOP and passes buffer to GreedyRange
        GreedyRange(
            Select(
                FocusedSeq(
                    'x',
                    Const(Char.ESC, Byte),
                    x=MAPPING  # NOTE intentionally raises MappingError
                ),
                Byte  # NOTE fallback for MappingError
            )
        ),
        term=Byte.build(Char.STOP)
    )
)
0 голосов
/ 20 ноября 2018

Еще один способ сделать это - использовать построить класс адаптера для изменения последовательности байтов.

Вот еще один пример кода:

from construct import Byte, Const, FocusedSeq, GreedyRange, \
    If, Mapping, Renamed, Select, this, Adapter
from enum import IntEnum


class Char(IntEnum):
    START = 0xAB
    STOP = 0xBC
    ESC = 0xCD

MAPPING = Mapping(Byte, {x: x+1 for x in Char})

SLIP = GreedyRange(
    Select(
        FocusedSeq(
            'x',
            Const(Char.ESC, Byte),
            Renamed(MAPPING, 'x')
        ),
        Byte
    )
)
buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])

slip_build = SLIP.build(buffer)
assert slip_build == b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff'
slip_parsed = SLIP.parse(b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff')


class ProtocolAdapter(Adapter):
    def _decode(self, obj, context, path):

        # remove first and last bite
        obj.pop(0)
        obj.pop(-1)
        return obj

    def _encode(self, obj, context, path):
        return obj

PROTOCOL = FocusedSeq(
    "message",
    If(this._building == True, Const(Char.START, Byte)),
    "message" / SLIP,
    If(this._building == True, Const(Char.STOP, Byte))

)
ADAPTED_PROTOCOL = ProtocolAdapter(PROTOCOL)

protocol_build = ADAPTED_PROTOCOL.build(buffer)
assert protocol_build == b'\xab\x00\xcd\xac\xcd\xbd\xcd\xce\xff\xbc'
protocol_parsed = ADAPTED_PROTOCOL.parse(protocol_build)
assert protocol_parsed == slip_parsed
0 голосов
/ 18 ноября 2018

В вашем случае вы можете просто переставить поля PROTOCOL и поставить SLIP в конце.

PROTOCOL = FocusedSeq(
    'message',
    Const(Char.START, Byte),
    Const(Char.STOP, Byte),
    Renamed(SLIP, 'message')
)

Таким образом, GreedyRange не будет использовать все байты, которые вызвали ошибку разбора потока: construct.core.StreamError: stream read less than specified amount, expected 1, found 0.

Вот модифицированный образец:

from construct import Byte, Const, FocusedSeq, GreedyRange, Mapping, Renamed, Select
from enum import IntEnum


class Char(IntEnum):
    START = 0xAB
    STOP = 0xBC
    ESC = 0xCD

MAPPING = Mapping(Byte, {x: x+1 for x in Char})

SLIP = GreedyRange(
    Select(
        FocusedSeq(
            'x',
            Const(Char.ESC, Byte),
            Renamed(MAPPING, 'x')
        ),
        Byte
    )
)
buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])

slip_build = SLIP.build(buffer)
assert slip_build == b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff'
slip_parsed = SLIP.parse(b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff')

PROTOCOL = FocusedSeq(
    'message',
    Const(Char.START, Byte),
    Const(Char.STOP, Byte),
    Renamed(SLIP, 'message')
)

protocol_build = PROTOCOL.build(buffer)
assert protocol_build == b'\xab\xbc\x00\xcd\xac\xcd\xbd\xcd\xce\xff'
protocol_parsed = PROTOCOL.parse(protocol_build)
assert protocol_parsed == slip_parsed
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...