Я следую https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example для подключения Mosquitto и Kafka с разъемом источника MQTT.Я получаю данные, отправленные издателем Mosquitto подписчику Mosquitto и потребителю Kafka.Но поле ключа и значения в моем объекте ConsumerRecord объекта kafka-consumer содержит некоторые предварительно добавленные байтовые символы.Ниже приведены фрагменты кода и выходные данные, которые я получаю.
mqttPublisher.py
while v3 < 3:
data3 = {
"time": str(datetime.datetime.now().time()),
"val": v3
}
client.publish("sensor/dist", json.dumps(data3), qos=2)
v3 += 1
time.sleep(2)
mqttSubscriber.py
def on_message_print(client, userdata, message):
print(message.topic,message.payload)
subscribe.callback(on_message_print, "sensor/#", hostname="localhost")
kafkaConsumer.py
consumer = KafkaConsumer('mqtt.',
bootstrap_servers=['localhost:9092'])
for message in consumer:
print(message)
Выход: mqttSubscriber.py
sensor / dist b '{"time":"12: 44: 30.817462", "val": 0} '
sensor / dist b' {"time": "12: 44: 32.820040", "val": 1} '
sensor / dist b '{"time": "12: 44: 34.822657", "val": 2}'
Выход: kafkaConsumer.py
ConsumerRecord (topic = 'mqtt.', Раздел = 0, смещение = 225, timestamp = 1545117270870, timestamp_type = 0, key = b '\ x00 \ x00 \ x00 \ x00 \ x01 \ x16sensor / dist' , value = b '\ x00 \ x00 \ x00 \ x00 \ x02J {"time": "12: 44: 30.817462", "val": 0} ', headers = [('mqtt.message.id ', b'0'), ('mqtt.qos', b'0 '), (' mqtt.retained ', b'false'), ('mqtt.duplicate', b'false ')],контрольная сумма = нет, serialized_key_size = 17, serialized_value_size = 43, serialized_header_size = 62)
ConsumerRecord (topic = 'mqtt.', partition = 0, смещение = 226, отметка времени = 1545117272821, отметка времени = 0, key = b '\ x00 \ x00 \ x00 \ x00 \ x01 \ x16sensor / dist' , value = b '\x00 \ x00 \ x00 \ x00 \ x02J {"время": "12: 44: 32.820040", "val": 1} ', заголовки = [(' mqtt.message.id ', b'0'), ('mqtt.qos', b'0 '), (' mqtt.retained ', b'false'), ('mqtt.duplicate', b'false ')], контрольная сумма = нет, serialized_key_size = 17, serialized_value_size= 43, serialized_header_size = 62)
ConsumerRecord (тема = 'mqtt.', Раздел = 0, смещение = 227, временная метка = 1545117274824, timestamp_type = 0, ключ = b '\ x00 \ x00 \x00 \ x00 \ x01 \ x16sensor / dist ', value = b' \ x00 \ x00 \ x00 \ x00 \ x02J {"time": "12: 44: 34.822657", "val": 2}', headers = [(' mqtt.message.id ', b'0'), ('mqtt.qos', b'0 '), (' mqtt.retained ', b'false'), ('mqtt.duplicate', b'false ')], контрольная сумма = нет, serialized_key_size = 17, serialized_value_size = 43, serialized_header_size = 62)
Что вызывает вышеуказанное добавление дополнительных байтов в Kafka Consumer?Заранее спасибо.