Начиная с страницы ctypes doc: [Python]: ctypes - библиотека сторонних функций для Python .
Здесь проблема с дизайном. Представьте себе следующий сценарий: другая часть отправляет заголовок + данные (2 раза), например:
- 15 байтов (
data_len
= 15) и 15 дополнительных байтов после тех, которые соответствуют struct
- 25 байтов (
data_len
= 25) и 25 дополнительных байтов после тех, которые соответствуют struct
Вы не можете смоделировать этот сценарий с CommonMessage
, определенным статически . Что вам нужно сделать, это разбить его на 2 ( header и data ) разделы, и каждый раз, когда вы хотите получить сообщение:
- Читать заголовок
- Смотрите в шапке
data_len
- Считать точно
data_len
байтов в качестве данных
Это общее решение, которое работает для всех таких случаев. Если в вашей ситуации есть особенности, можно использовать другие подходы - которые либо проще (из кода PoV ), либо быстрее - но они будут работать только для этих особенностей. Но поскольку в вопросе не упоминаются такие особенности (это даже не mcve -
поскольку код не компилируется OOTB ), я собираюсь придерживаться общего решения.
Обратите внимание, что это то же самое, что и [SO]: многократная отправка с сервера на клиентский питон
(@ Ответ КристиФати) .
code.py
import sys
import ctypes
# b"\xcc\xdd\xee\xff~Y\xd4\x0b\x02\xb9\xa9\x00i\x02\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" # Original buffer
INPUT_BUFFER = b"\xcc\xdd\xee\xff~Y\xd4\x0b\x02\xb9\xa9\x00i\x02\x00\x00\x01\x00\x00\x00\x0F\x00\x00\x00\x01\x02\x03\x04\x05Dummy TextIGNORED PART" # At the end, there are bytes for data
# ^ - Modified data length to 15
class CommonMessageHeader(ctypes.Structure):
_pack_ = 1
_fields_ = [
("sof", ctypes.c_uint),
("request_id", ctypes.c_uint),
("interface", ctypes.c_uint),
("msg_type", ctypes.c_uint),
("response", ctypes.c_uint),
("data_len", ctypes.c_int),
]
def print_ctypes_instance_info(obj, indent="", metadata="", max_array_items=10, indent_increment=" "):
if metadata:
print("{:s}{:s}: {:}".format(indent, metadata, obj))
else:
print("{:s}[{:}]: {:}".format(indent, type(obj), obj))
if isinstance(obj, ctypes.Structure):
for field_name, field_type in obj._fields_:
print_ctypes_instance_info(getattr(obj, field_name),indent=indent + indent_increment, metadata="[{:}] {:s}".format(field_type, field_name))
elif isinstance(obj, ctypes.Array):
if max_array_items < len(obj):
items_len, extra_line = max_array_items, True
else:
items_len, extra_line = len(obj), False
for idx in range(items_len):
print_ctypes_instance_info(obj[idx], indent=indent + indent_increment, metadata="[{:d}]".format(idx))
if extra_line:
print("{:s}...".format(indent + indent_increment))
def get_instance_from_buffer(buffer):
header_len = ctypes.sizeof(CommonMessageHeader)
if header_len > len(buffer):
raise ValueError(-1, "Buffer smaller than header")
header = CommonMessageHeader.from_buffer_copy(buffer)
required_buf_len = header_len + header.data_len
if required_buf_len > len(buffer):
raise ValueError(-2, "Buffer smaller than header and data")
class CommonMessage(ctypes.Structure):
_pack_ = 1
_fields_ = [
("header", CommonMessageHeader),
("data", ctypes.c_ubyte * header.data_len)
]
return CommonMessage.from_buffer_copy(buffer), required_buf_len
def main():
print("sizeof(CommonMessageHeader): {:d}".format(ctypes.sizeof(CommonMessageHeader)))
print("Input buffer len: {:d}\n".format(len(INPUT_BUFFER)))
inst, inst_len = get_instance_from_buffer(INPUT_BUFFER)
print_ctypes_instance_info(inst)
print("\nOutput data as a string ({:d}): [{:s}]".format(len(inst.data), "".join([repr(chr(item))[1:-1] for item in inst.data])))
print("The rest of the buffer: [{:}]".format(INPUT_BUFFER[inst_len:]))
if __name__ == "__main__":
print("Python {:s} on {:s}\n".format(sys.version, sys.platform))
main()
Примечания
- Самая большая функция (
print_ctypes_instance_info
) используется только для печати объекта
- Я немного изменил ваш оригинальный буфер, чтобы на самом деле было с чем работать
get_instance_from_buffer
- это функция, которая пытается преобразовать существующий буфер в экземпляр CommonMessage
(который отличается от того, что в вопросе, который теперь CommonMessageHeader
):
- Ожидается буфер (это означает, что он уже прочитан). Это потому, что я не хотел создавать целое приложение сервер / клиент и обмениваться сообщениями. На самом деле я избавился от всего кода, связанного с сокетами (поскольку протокол, который я пытаюсь проиллюстрировать, на самом деле не зависит от него).
Вероятно, окончательная реализация должна полагаться на сокет, а не на буфер, и считывать из него только необходимое количество данных
- Класс
CommonMessage
создается на лету для каждого сообщения (оно не является глобальным, как, например, CommonMessageHeader
), поскольку ctypes.Structure._fields_
можно установить только один раз
- Если буфер слишком маленький
ValueError
увеличивается
- Возвращаемое значение 2 nd - это количество байтов, которые были "использованы" для этого экземпляра; например, если входной буфер длиннее, остальные могут принадлежать следующему сообщению
выход
(py35x64_test) e:\Work\Dev\StackOverflow\q051086829>"e:\Work\Dev\VEnvs\py35x64_test\Scripts\python.exe" code.py
Python 3.5.4 (v3.5.4:3f56838, Aug 8 2017, 02:17:05) [MSC v.1900 64 bit (AMD64)] on win32
sizeof(CommonMessageHeader): 24
Input buffer len: 51
[<class '__main__.get_instance_from_buffer.<locals>.CommonMessage'>]: <__main__.get_instance_from_buffer.<locals>.CommonMessage object at 0x000002106F1BBBC8>
[<class '__main__.CommonMessageHeader'>] header: <__main__.CommonMessageHeader object at 0x000002106F1BBB48>
[<class 'ctypes.c_ulong'>] sof: 4293844428
[<class 'ctypes.c_ulong'>] request_id: 198465918
[<class 'ctypes.c_ulong'>] interface: 11122946
[<class 'ctypes.c_ulong'>] msg_type: 617
[<class 'ctypes.c_ulong'>] response: 1
[<class 'ctypes.c_long'>] data_len: 15
[<class '__main__.c_ubyte_Array_15'>] data: <__main__.c_ubyte_Array_15 object at 0x000002106F1BBB48>
[0]: 1
[1]: 2
[2]: 3
[3]: 4
[4]: 5
[5]: 68
[6]: 117
[7]: 109
[8]: 109
[9]: 121
...
Output data as a string (15): [\x01\x02\x03\x04\x05Dummy Text]
The rest of the buffer: [b'IGNORED PART']