Возможность с помощью Kafka-Connector получать данные из Python -Opcua-Client в Kafka-тему - PullRequest
0 голосов
/ 19 февраля 2020

что есть python opcua client. datachange_notification(self, node, val, data) будет выполняться как часть python opcua client всякий раз, когда новые данные поступают к client, см. Скрипт ниже, возможно ли отправить поступившие данные (в данном контексте val) на kafka-topi c.

    def datachange_notification(self, node, val, data):
        saveData = save_file.Savefile()
        #save data locally when data arrived according to the nodeid=2
        if str(node) == "Node(NumericNodeId(ns=2;i=2))":
            if val != None and val != 0:
                print("Python: New data change event", node, val)
                saveData.saveBlockByClient(val)
        #save data locally when data arrived according to the nodeid=3
        if str(node) == "Node(NumericNodeId(ns=2;i=3))":
            if val != None and val != 0:
                print("Python: New data change event", node, val)
                saveData.saveSourceByValidator(str(val))

Ответы [ 2 ]

0 голосов
/ 25 февраля 2020

Пожалуйста, попробуйте, если это работает для kafka- python package-

from kafka import KafkaProducer

class KafkaOpcUaSubHandler(object): 
    def __init__(self):
        self.msg={}

    def connect_kafka(self, server, port):
        kafka_producer = KafkaProducer(
                bootstrap_server ='{}:{}'.format(server, port),
                retries=0, batch_size=0, compression_type=None
            )


    def kafka_message_producer(self, kafka_topic, msg):
        kafka_producer = self.connect_kafka(kafka_server, kafka_port)
        your_kafka_message = kafka_producer
                             .send(kafka_topic, json.dumps(msg, default=str)
                             .encode('utf-8'))
        kafka_producer.flush()


    def datachange_notification(self, node, val, data):
       self.your_function(node, val, self.msg, data)

    def your_function(self, node, val, msg, data):
        payload = []
        payload = [val, str(data.monitored_item.Value.SourceTimestamp)]
        self.kafka_message_producer(kafka_topic, payload)
0 голосов
/ 19 февраля 2020

Возможно ли это? Конечно.

Импортируйте библиотеку Kafka и создайте новый экземпляр Producer, затем отправьте сообщение

Если вы используете клиент AsyncIO, упомянутый в комментариях, я бы предложил изучить библиотеку aiokafka python

...