Тестирование MQTT-брокера путем подключения 100 000 клиентов - PullRequest
0 голосов
/ 10 сентября 2018

Я давно работаю над протоколом MQTT. Этот протокол будет использоваться организацией для отправки подтверждающих сообщений каждому клиенту в отдельности.

В сценарии использования есть только 1 клиент-издатель, который публикует сообщения-подтверждения клиентам-подписчикам, подписавшимся на их уникальные темы

Чтобы убедиться, что брокер по большей части не содержит ошибок и может легко масштабироваться в будущем, я пытался протестировать брокеров с открытым исходным кодом Emqx и Vernemq, пытаясь подключить к ним как минимум 50 000 клиентов.

Однако я не могу создать столько соединений. Моя Ubuntu 18.04 Экземпляр (8-ядерный ЦП, 15 ГБ ОЗУ) в Google Cloud не может установить более успешное соединение после 300-400.

Я попытался внести следующие изменения: ulimit -n 64500 (для разрешения множества файловых дескрипторов, поскольку для каждого соединения с сокетом требуется файловый дескриптор)

Пожалуйста, помогите мне установить более 50 000 соединений. Должен ли я запускать 'n' количество потоков и запускать клиенты total_clients / total_threads в цикле в каждом потоке?

или я должен создать один поток для каждого клиентского соединения?

Что мне делать?

Следующее сообщение появляется в теме «$ SYS / #», как только клиенты начинают отключаться, даже если клиентская сторона не отправляет пакет данных отключения. $SYS/brokers/emqx@127.0.0.1/clients/112/disconnected {"clientid": "112", "username": "undefined", "reason": "closed", "ts": 1536587647}

import paho.mqtt.client as mqtt
from threading import Lock
import time



print_lock = Lock()
def s_print(str):
    print_lock.acquire()
    print(str)
    print_lock.release()


def on_connect(client, userdata, flags, rc):
    client_id = userdata["client_id"]
    if (rc == 0):
        s_print("connected " + client_id)
        client.subscribe(client_id, 2)

def on_disconnect(client, userdata, rc):
        client_id = userdata["client_id"]
        s_print("disconnected: " + client_id + " reason " + str(rc))



def on_message(client, userdata, message):
    topic = message.topic
    payload = str(message.payload)
    s_print("Recieved " + payload + " on " + topic)


if __name__ == '__main__':
    n_clients = int(input("Enter no. of clients:    "))

    for i in range(n_clients):
        client_id = str(i)
        s_print(client_id)
        userdata = {
                "client_id"  : client_id
            }

        client = mqtt.Client(client_id=client_id, clean_session=True, userdata=userdata)
        client.on_connect  = on_connect
        client.on_disconnect =on_disconnect
        client.on_message = on_message
        client.connect("35.228.57.228", 1883, 60)
        client.loop_start()    
        time.sleep(0.5)    

while(1):
    time.sleep(5)
...