Пожалуйста, попробуйте, если это работает для kafka- python package-
from kafka import KafkaProducer
class KafkaOpcUaSubHandler(object):
def __init__(self):
self.msg={}
def connect_kafka(self, server, port):
kafka_producer = KafkaProducer(
bootstrap_server ='{}:{}'.format(server, port),
retries=0, batch_size=0, compression_type=None
)
def kafka_message_producer(self, kafka_topic, msg):
kafka_producer = self.connect_kafka(kafka_server, kafka_port)
your_kafka_message = kafka_producer
.send(kafka_topic, json.dumps(msg, default=str)
.encode('utf-8'))
kafka_producer.flush()
def datachange_notification(self, node, val, data):
self.your_function(node, val, self.msg, data)
def your_function(self, node, val, msg, data):
payload = []
payload = [val, str(data.monitored_item.Value.SourceTimestamp)]
self.kafka_message_producer(kafka_topic, payload)