Создание 10К подключений и отправка данных - PullRequest
0 голосов
/ 10 июля 2020

Я провожу некоторое тестирование с использованием Thingsboard (платформа IoT с открытым исходным кодом), в основном, я пытаюсь увидеть его производительность, а также производительность базы данных cassandra и postgresql базы данных при получении данных с нескольких устройств.

В настоящее время я отправляю данные через paho-mqtt, а до этого я правильно создал устройства. Я могу отправлять данные каждые 10 секунд для 200 устройств в течение длительного периода времени. Однако моя цель - масштабировать его и видеть производительность при получении данных с устройств 10K. Но, если я хочу протестировать (используя 1000 устройств), я вообще не могу подключиться к ним. Есть ли какие-либо ограничения на количество устройств / клиентов, которые paho-mqtt может создавать с помощью потоков?

Я читал, что это может быть связано с количеством дескрипторов открытых файлов, и когда я вижу свой предел (с ulimit -a), я вижу, что мой лимит составляет 1024. Однако я могу запросить дополнительные процессоры, а когда я использую 4 процессора, мой лимит составляет 4096. Однако, когда я пытаюсь запустить свой скрипт, большую часть времени, подключается только один или два клиента, а остальные (998 клиентов) отключены.

Я делаю что-то не так с моим кодом?

import paho.mqtt.client as mqtt
import time
import threading
import logging
import thingsboard_objects as Things
import random
import datetime
logging.basicConfig(level=logging.INFO)


init_time = time.time()


def Connect(client, broker, port, token, keepalive, run_forever=False):
    connflag = False
    delay = 5
    print("connecting ",client)
    badcount = 0  # counter for bad connection attempts
    while not connflag:
        print(logging.info("connecting to broker " + str(broker)))
        # print("connecting to broker "+str(broker)+":"+str(port))
        print("Attempts ", str(badcount))
        time.sleep(2)
        try:
            client.username_pw_set(token)
            client.connect(broker, port, keepalive)
            connflag = True

        except:
            client.badconnection_flag = True
            logging.info("connection failed " + str(badcount))
            badcount += 1
            if badcount >= 3 and not run_forever:
                return -1
                raise SystemExit  # give up

    return 0


def wait_for(client, msgType, period=2, wait_time=20, running_loop=False):
    """Will wait for a particular event gives up after period*wait_time, Default=10
seconds.Returns True if succesful False if fails"""
    # running loop is true when using loop_start or loop_forever
    client.running_loop = running_loop  #
    wcount = 0
    while True:
        logging.info("waiting" + msgType)
        if msgType == "CONNACK":
            if client.on_connect:
                if client.connected_flag:
                    return True
                if client.bad_connection_flag:  #
                    return False

        if msgType == "SUBACK":
            if client.on_subscribe:
                if client.suback_flag:
                    return True
        if msgType == "MESSAGE":
            if client.on_message:
                if client.message_received_flag:
                    return True
        if msgType == "PUBACK":
            if client.on_publish:
                if client.puback_flag:
                    return True

        if not client.running_loop:
            client.loop(.01)  # check for messages manually
        time.sleep(period)
        wcount += 1
        if wcount > wait_time:
            print("return from wait loop taken too long")
            return False
    return True


def client_loop(client, broker, port, token, keepalive=600, loop_function=None,
                loop_delay=10, run_forever=False):
    """runs a loop that will auto reconnect and subscribe to topics
    pass topics as a list of tuples. You can pass a function to be
    called at set intervals determined by the loop_delay
    """
    client.run_flag = True
    client.broker = broker
    print("running loop ")
    client.reconnect_delay_set(min_delay=1, max_delay=12)

    while client.run_flag:  # loop forever

        if client.bad_connection_flag:
            break
        if not client.connected_flag:
            print("Connecting to " + broker)
            if Connect(client, broker, port, token, keepalive, run_forever) != -1:
                if not wait_for(client, "CONNACK"):
                    client.run_flag = False  # break no connack
            else:  # connect fails
                client.run_flag = False  # break
                print("quitting loop for  broker ", broker)

        client.loop(0.01)

        if client.connected_flag and loop_function:  # function to call
            loop_function(client, loop_delay)  # call function

    time.sleep(1)
    print("disconnecting from", broker)
    if client.connected_flag:
        client.disconnect()
        client.connected_flag = False


def on_log(client, userdata, level, buf):
    print(buf)


#def on_message(client, userdata, message):
#    time.sleep(1)
#    print("message received", str(message.payload.decode("utf-8")))


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.connected_flag = True  # set flag
        for c in clients:
            if client == c["client"]:
                if c["sub_topic"] != "":
                    client.subscribe(c["sub_topic"])

                    print("connected OK")
    else:
        print("Bad connection Returned code=", rc)
        client.loop_stop()


def on_disconnect(client, userdata, rc):
    client.connected_flag = False  # set flag
    # print("client disconnected ok")


def on_publish(client, userdata, mid):
    print("In on_pub callback mid= ", mid)


def pub(client, loop_delay):

    rmd_current = round(random.uniform(0.6, 50.0), 2)
    rmd_pressure = round(random.uniform(0.6, 50.0), 2)
    global init_time
    if time.time() - init_time >= 3600:
        rmd_mnc = round(random.uniform(5.0, 30.0), 2)
        rmd_sdc = round(random.random(), 2)
        rmd_mnp = round(random.uniform(5.0, 30.0), 2)
        rmd_sdp = round(random.random(), 2)

        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
        client.publish('v1/devices/me/telemetry',
                       '{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
                       '"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))

        init_time = time.time()
    else:
        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
    print(datetime.datetime.now())
    time.sleep(loop_delay)
    pass


def Create_connections():
    for i in range(n_clients):
        cname = "client" + str(i)
        t = int(time.time())
        client_id = cname + str(t)  # create unique client_id
        client = mqtt.Client(client_id)  # create new instance
        clients[i]["client"] = client
        clients[i]["client_id"] = client_id
        clients[i]["cname"] = cname
        broker = clients[i]["broker"]
        port = clients[i]["port"]
        token = clients[i]["token"]
        client.on_connect = on_connect
        client.on_disconnect = on_disconnect
        client.on_publish = on_publish
        #client.on_message = on_message
        t = threading.Thread(target=client_loop, args=(client, broker, port, token, 600, pub))
        threads.append(t)
        t.start()


if __name__ == '__main__':

    #things_location = input("What type of thingsboard installation are you working with (demo/local)? ")
    things_location = "local"
    if things_location == "local":
        type_install = 'cseetprj03.essex.ac.uk:8080'
        broker = 'cseetprj03.essex.ac.uk'
    else:
        type_install = broker = 'demo.thingsboard.io'

    header = Things.get_credentials(things_location)
    my_devices = Things.get_devices_id(header, type_install)

    clients = []
    for device in my_devices:
        device_info = {"broker": broker, "port": 1883, "name": device["name"],
                       "token": Things.get_device_token(device["id"]["id"], header, type_install)}
        clients.append(device_info)

    n_clients = len(clients)
    mqtt.Client.connected_flag = False  # create flag in class
    mqtt.Client.bad_connection_flag = False  # create flag in class

    threads = []
    print("Creating Connections ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("Publishing ")
    Create_connections()

    print("All clients connected ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("starting main loop")
    try:
        while no_threads == 1001:
            time.sleep(10)
            no_threads = threading.active_count()
            print("current threads =", no_threads)
            for c in clients:
                if not c["client"].connected_flag:
                    print("broker ", c["broker"], " is disconnected")

    except KeyboardInterrupt:
        print("ending")
        for c in clients:
            c["client"].run_flag = False
    time.sleep(10)

Заранее спасибо

1 Ответ

0 голосов
/ 13 июля 2020

Для начала, из проверки кода

t = int(time.time())

вернет одно и то же значение для многих итераций через l oop, в результате чего идентификатор клиента будет одинаковым. Несколько клиентов с одним и тем же идентификатором будут отключены друг от друга.

Когда вы решите этот вопрос, вы обнаружите, что Python не является многопроцессорной многопоточностью, если вы не используете пакет многопроцессорной обработки на основе процессов. Скорее всего, вашей производительности будет недостаточно для поддержки 10k потоков.

Кстати, зачем изобретать колесо симулятора MQTT? Отказ от ответственности: мы продаем такой товар, а 10к подключений - это банально.

...