MQTT Kafka Source соединитель: смешные байтовые символы - PullRequest
0 голосов
/ 18 декабря 2018

Я следую https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example для подключения Mosquitto и Kafka с разъемом источника MQTT.Я получаю данные, отправленные издателем Mosquitto подписчику Mosquitto и потребителю Kafka.Но поле ключа и значения в моем объекте ConsumerRecord объекта kafka-consumer содержит некоторые предварительно добавленные байтовые символы.Ниже приведены фрагменты кода и выходные данные, которые я получаю.

mqttPublisher.py

while v3 < 3:
             data3 = {
                      "time": str(datetime.datetime.now().time()),
                       "val": v3
                      }
             client.publish("sensor/dist", json.dumps(data3), qos=2)

             v3 += 1
             time.sleep(2)

mqttSubscriber.py

def on_message_print(client, userdata, message):
            print(message.topic,message.payload)

subscribe.callback(on_message_print, "sensor/#", hostname="localhost")

kafkaConsumer.py

consumer = KafkaConsumer('mqtt.',
                     bootstrap_servers=['localhost:9092'])

for message in consumer:
   print(message)

Выход: mqttSubscriber.py

sensor / dist b '{"time":"12: 44: 30.817462", "val": 0} '

sensor / dist b' {"time": "12: 44: 32.820040", "val": 1} '

sensor / dist b '{"time": "12: 44: 34.822657", "val": 2}'

Выход: kafkaConsumer.py

ConsumerRecord (topic = 'mqtt.', Раздел = 0, смещение = 225, timestamp = 1545117270870, timestamp_type = 0, key = b '\ x00 \ x00 \ x00 \ x00 \ x01 \ x16sensor / dist' , value = b '\ x00 \ x00 \ x00 \ x00 \ x02J {"time": "12: 44: 30.817462", "val": 0} ', headers = [('mqtt.message.id ', b'0'), ('mqtt.qos', b'0 '), (' mqtt.retained ', b'false'), ('mqtt.duplicate', b'false ')],контрольная сумма = нет, serialized_key_size = 17, serialized_value_size = 43, serialized_header_size = 62)

ConsumerRecord (topic = 'mqtt.', partition = 0, смещение = 226, отметка времени = 1545117272821, отметка времени = 0, key = b '\ x00 \ x00 \ x00 \ x00 \ x01 \ x16sensor / dist' , value = b '\x00 \ x00 \ x00 \ x00 \ x02J {"время": "12: 44: 32.820040", "val": 1} ', заголовки = [(' mqtt.message.id ', b'0'), ('mqtt.qos', b'0 '), (' mqtt.retained ', b'false'), ('mqtt.duplicate', b'false ')], контрольная сумма = нет, serialized_key_size = 17, serialized_value_size= 43, serialized_header_size = 62)

ConsumerRecord (тема = 'mqtt.', Раздел = 0, смещение = 227, временная метка = 1545117274824, timestamp_type = 0, ключ = b '\ x00 \ x00 \x00 \ x00 \ x01 \ x16sensor / dist ', value = b' \ x00 \ x00 \ x00 \ x00 \ x02J {"time": "12: 44: 34.822657", "val": 2}', headers = [(' mqtt.message.id ', b'0'), ('mqtt.qos', b'0 '), (' mqtt.retained ', b'false'), ('mqtt.duplicate', b'false ')], контрольная сумма = нет, serialized_key_size = 17, serialized_value_size = 43, serialized_header_size = 62)

Что вызывает вышеуказанное добавление дополнительных байтов в Kafka Consumer?Заранее спасибо.

1 Ответ

0 голосов
/ 18 декабря 2018

В рамках демонстрации вы запускаете реестр схем

Запустите Kafka Connect и зависимости (Kafka, Zookeeper, Registry Schema):

confluent start connect

Если вы посмотрите на первые 5 байтов, вы увидите, что они начинаются с 0, а затем еще четыре байта, представляющих целое число.

См. Формат реестра реестра схемы и попробуйте сделать curl localhost:8081/subjects, чтобы увидеть, содержит ли оно название вашей темы для mqtt-key и mqtt-value.

Если вы не хотите использовать Avro, вам нужно будет настроить и отредактировать свойство Kafka Connect.файл для использования с другими конвертерами, и не используйте confluent start кроме запуска Kafka и Zookeeper

Или если вы хотите, чтобы Python десериализовал Avro, вы можете обратиться к репозиторию confluent-kafka-python на Github

...