Python3 paho-mqtt скорость против узлаjs mqtt - PullRequest
0 голосов
/ 13 ноября 2018

Я провел несколько тестов скорости для MQTT в Python3 и Node.js, используя af QoS level 0, и обнаружил, что Node.js заметно быстрее, чем реализация Python3. Как это может быть? Я открыт для использования любой инфраструктуры в качестве моста на стороне сервера для обработки данных от нескольких клиентов. Однако я теряю уверенность в том, что мне следует использовать Python3 для чего-либо на сервере.

Выполнение фрагментов кода.

Python3:

import paho.mqtt.client as mqtt
import logging
import time
import threading

import json
import sys



class MqttAdaptor(threading.Thread):

    def __init__(self,topic, type=None):
        threading.Thread.__init__(self)
        self.topic = topic

        self.client = None
        self.type = type




    def run(self):


        self.client = mqtt.Client(self.type)


        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        if self.type is not None:
            self.client.connect("localhost", 1883, 60)
            self.client.on_message = self.on_message
            self.client.loop_forever()  
        else:
            self.client.connect_async("localhost", 1883, 60)
            self.client.loop_start()


    # The callback for when the client receives a CONNACK response from the server.
    def on_connect(self,client, userdata, flags, rc):
        self.client.subscribe(self.topic)



    def on_disconnect(self, client, userdata, rc):
        if rc != 0:
            print("Unexpected disconnection from local MQTT broker")


    # The callback for when a PUBLISH message is received from the server.
    def on_message(self,client, userdata, msg):
        jsonMsg = ""
        try:    
            jsonMsg = json.loads(msg.payload)

            if jsonMsg['rssi'] is not None:
                jsonMsg['rssi'] = round(jsonMsg['rssi']*3.3 * 100000)/ 10000

        except:
            pass


        print(json.dumps(jsonMsg))




    def publish(self,topic, payload, qos=0,retain=False):

        self.client.publish(topic,payload,qos,retain)


    def close(self):
        if self.client is not None:
            self.client.loop_stop()
            self.client.disconnect()

if __name__=="__main__":

    topic = '/test/+/input/+'


    subber = MqttAdaptor(topic,'sub')
    subber.start()
    topic = None
    test = MqttAdaptor(topic)
    test.run()

    print("start")
    while True:
        data = sys.stdin.readline()
        if not len(data):
            print("BREAK")
            break
        msg = data.split('\t')
        topic = msg[0]
        test.publish(topic,msg[1],0)
    print("done")
    sys.exit(0)

Node.js:

"use strict";
const fs = require('fs');
const readline = require('readline');
const mqtt = require('mqtt');

const mqttClient = mqtt.connect();

mqttClient.on('connect', () => {
    console.error('==== MQTT connected ====');
    mqttClient.subscribe('/test/+/input/+');
});

mqttClient.on('close', () => {
    console.error('==== MQTT closed ====');
});

mqttClient.on('error', (error) => {
    console.error('==== MQTT error ' + error + ' ====');
});

mqttClient.on('offline', () => {
    console.error('==== MQTT offline ====');
});

mqttClient.on('reconnect', () => {
    console.error('==== MQTT reconnect ====');
});

mqttClient.on('message', (topic, message) => {
    const topicSegments = topic.split('/');
    topicSegments[topicSegments.length - 2] = 'done';
    topic = topicSegments.join('/');

    try {
        //The message might not always be valid JSON
        const json = JSON.parse(message);
        //If rssi is null/undefined in input, it should be left untouched
        if (json.rssi !== undefined && json.rssi !== null) {
            //Multiply by 3 and limit the number of digits after comma     to four
            json.rssi = Math.round(json.rssi * 3.3 * 10000) / 10000;
        }
        console.log(topic + "\t" + JSON.stringify(json));
    } catch (ex) {
        console.error('Error: ' + ex.message);
    }
});


const rl = readline.createInterface({
    input: process.stdin,
    terminal: false,
});

rl.on('line', (line) => {
    const lineSegments = line.split("\t");
    if (lineSegments.length >= 2) {
        const topic = lineSegments[0];
        const message = lineSegments[1];
        mqttClient.publish(topic, message);
    }
});

rl.on('error', () => {
    console.error('==== STDIN error ====');
    process.exit(0);
});

rl.on('pause', () => {
    console.error('==== STDIN paused ====');
    process.exit(0);
});

rl.on('close', () => {
    console.error('==== STDIN closed ====');
    process.exit(0);
});

Оба скрипта запускаются в командной строке, подключаясь к одному и тому же брокеру. Они запускаются с использованием скриптового канала (узла):

time cat test-performance.txt | pv -l -L 20k -q | nodejs index.js | pv -l | wc -l

и (python):

time cat test-performance.txt | pv -l -L 20k -q | python3 mqttTestThread.py | pv -l | wc -l

Тестовый файл содержит около 2 ГБ текста в этом формате:

/test/meny/input/test   {"sensor":"A1","data1":"176","time":1534512473545}

Как показано в сценариях, я считаю количество строк за время их выполнения. Для небольшого теста пропускная способность скрипта Python3 составляет примерно 3 кбит / с, а пропускная способность узла - примерно 20 кбит / с. Это большая разница. У кого-нибудь есть идеи почему? И / или как заставить Python работать с сопоставимой пропускной способностью?

...