У нас есть производитель кафки для данных mqtt.Ниже приведен мой скрипт на python:
import paho.mqtt.client as mqtt
import os
import ssl
import argparse
import threading, logging, time
import multiprocessing
from kafka import KafkaProducer,
from location_pb2 import Locations
def on_connect(mqttc, obj, flags, rc):
print("rc: " + str(rc) + " MQTT Connected")
def on_message(mqttc, obj, msg):
#print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
send_message_to_kafka(msg.payload)
def send_message_to_kafka(message):
producer = KafkaProducer(bootstrap_servers='xxxx07:9092,xxxx04:9092, xxxx05:9092', api_version='(0,10)',security_protocol='SASL_PLAINTEXT', sasl_plain_username='user', sasl_plain_password='pass', sasl_mechanism='PLAIN' )
producer.send('my-topic',message)
print("sending message to kafka: %s" % message)
producer.flush()
#producer.close
metrics = producer.metrics()
print(metrics)
Когда я пытался напечатать сообщение перед отправкой производителю
print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
, данные поступают как показано ниже
1.0.0/LOC/SPOT_GOB/xxxx-xxxx/GPB_LOCR 0 b'\n+\n\txxxx-xxxx\x12\x0c6CC7ECA59000\x1d\xb8~\xe0c%\x008\x98C(\x020\xb0\xe5\xac\xe8x05'
но когда я попытался отправить функции-производителю: send_message_to_kafka (msg.payload)
0 сообщение пишется в тему кафки.