Клиентское подключение к кластеру VerneMQ - PullRequest
0 голосов
/ 03 сентября 2018

Мы настраиваем кластер VerneMQ из 3 узлов (https://vernemq.com/docs/clustering/).

Проблема в Я не нашел ни одного примера того, как подключить клиентов ко всему кластеру .

например. для одного экземпляра VerneMQ клиентское соединение ( в python ):

client.connect("xxx.xxx.xxx.xxx", 1883, 60)

где "xxx.xxx.xxx.xxx" - адрес одного экземпляра.
В случае кластера есть ли способ указать адрес всего кластера вместо указания одного узла?

Ответы [ 2 ]

0 голосов
/ 26 сентября 2018

С моим клиентом, написанным с использованием Paho MQTT клиентской библиотеки , я в итоге использовал следующее расширение класса Client:

import random
from paho.mqtt.client import *


class ClusterClient(Client):
    """A subclass of paho.mqtt.Client that supports connecting to a cluster of
       mqtt brokers. connect() and connect_async() additionally accept a list
       of hostnames or host/port tuples:

           connect("host1")

           connect(["host1", "host2", "host3"]) # use default port 1883

           connect(["host1", ("host2", 8883), ("host3", 8883)])

       Hosts to connect to are chosen randomly. If a host disappears the client
       automatically connects to another host from the list.
    """

    def __init__(self, client_id="", clean_session=True, userdata=None,
            protocol=MQTTv311, transport="tcp"):
        super().__init__(client_id, clean_session, userdata, protocol, transport)
        self._hosts = []

    def connect_async(self, host, port=1883, keepalive=60, bind_address=""):
        if isinstance(host, (list, tuple)):
            self._hosts = [(t, 1883) if isinstance(t, str) else t for t in host]
        else:
            self._hosts = [(host, port)]

        for host, port in self._hosts:
            if host is None or len(host) == 0:
                raise ValueError('Invalid host.')
            if port <= 0:
                raise ValueError('Invalid port number.')

        host, port = random.choice(self._hosts)

        super().connect_async(host, port, keepalive, bind_address)

    def reconnect(self):
        hosts = self._hosts[:]
        random.shuffle(hosts)
        while True:
            self._host, self._port = hosts.pop(0)
            try:
                return super().reconnect()
            except socket.error:
                if not hosts:
                    raise 
0 голосов
/ 04 сентября 2018

Не существует такого понятия, как «конкретный адрес всего кластера». (для VerneMQ и для любого другого программного обеспечения кластерного сервера).

Если вам нужна единая точка контакта, вам нужен балансировщик нагрузки перед кластером. Затем вы можете использовать различные стратегии балансировки нагрузки.

VerneMQ, например, хорошо работает с HAProxy и поддерживает протокол прокси.

Надеюсь, это поможет.

...