Я читаю данные из сокета ZeroMQ, используя pyzmq. Код также использует
websockets
поэтому решил использовать asyncio
для ZeroMQ и на стороне абонента.
Абонент должен прочитать составное сообщение. Вместо использования socket.recv_multipart()
Я решил использовать удобные функции pyzmq
для получения и отправки
import zmq
import zmq.asyncio
context = zmq.asyncio.Context()
socket = context.socket(zmq.SUB)
address = await socket.recv_string()
print(socket.RCVMORE)
clock = await socket.recv_pyobj()
print(socket.RCVMORE)
data = await socket.recv_pyobj()
Издатель также использует вспомогательные функции, не использует asyncio:
socket.send_string('raw', zmq.SNDMORE)
socket.send_pyobj(time.time(), zmq.SNDMORE)
socket.send_pyobj(d.data)
Обычно код работает нормально. Но иногда это терпит неудачу на стороне абонента с исключениями как
EOFError: Ran out of input
или
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte
Я вполне уверен, что это связано с кусочным чтением в сочетании с
asyncio и может быть решена с помощью await socket.recv_multipart()
. Но я пытаюсь
чтобы получить более глубокое понимание протокола ZeroMQ и Asyncio и будет
хотелось бы знать, почему именно это может не получиться.
Есть идеи?
Единственная нить , которая
может быть связано с этим, не дает хорошего ответа на вопрос, почему это может произойти.
Редактировать: я сталкивался с этим в документах , но не уверен, что это связано:
Составные сообщения
Сообщение ØMQ состоит из 1 или более частей сообщения. Каждое сообщение
часть является независимой zmq_msg_t сама по себе. ØMQ обеспечивает атомарный
доставка сообщений: коллеги должны получать либо все части сообщения
сообщение или вообще ничего.