Как правильно использовать этот код (Paho MQTT) в качестве GoRoutine и передавать сообщения через канал для публикации через веб-сокеты - PullRequest
0 голосов
/ 01 октября 2019

В качестве стандартного кода, который я использую для публикации сообщения в целях тестирования:

func main() {

    opts := MQTT.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")
    opts.SetClientID("myclientid_")
    opts.SetDefaultPublishHandler(f)
    opts.SetConnectionLostHandler(connLostHandler)

    opts.OnConnect = func(c MQTT.Client) {
        fmt.Printf("Client connected, subscribing to: test/topic\n")

        if token := c.Subscribe("logs", 0, nil); token.Wait() && token.Error() != nil {
            fmt.Println(token.Error())
            os.Exit(1)
        }
    }

    c := MQTT.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }


    for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        token := c.Publish("logs", 0, false, text)
        token.Wait()
    }

    time.Sleep(3 * time.Second)

    if token := c.Unsubscribe("logs"); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }

    c.Disconnect(250)
}

Это хорошо работает! но при массовой передаче сообщений при выполнении задач с высокой задержкой производительность моей программы будет низкой, поэтому я должен использовать goroutine и channel.

Итак, я искал способ сделать Worker внутри goroutine для ПУБЛИКАЦИИ сообщений в браузер, используя библиотеку Paho MQTT для GOlang, мне было трудно найти лучшее решение, которое отвечало бы моим потребностям, но послеВ некоторых поисках я нашел этот код:

package main

import (
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "io/ioutil"
    "strings"
    "time"

    MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
    "linksmart.eu/lc/core/catalog"
    "linksmart.eu/lc/core/catalog/service"
)

// MQTTConnector provides MQTT protocol connectivity
type MQTTConnector struct {
    config        *MqttProtocol
    clientID      string
    client        *MQTT.Client
    pubCh         chan AgentResponse
    subCh         chan<- DataRequest
    pubTopics     map[string]string
    subTopicsRvsd map[string]string // store SUB topics "reversed" to optimize lookup in messageHandler
}

const defaultQoS = 1

func (c *MQTTConnector) start() {
    logger.Println("MQTTConnector.start()")

    if c.config.Discover && c.config.URL == "" {
        err := c.discoverBrokerEndpoint()
        if err != nil {
            logger.Println("MQTTConnector.start() failed to start publisher:", err.Error())
            return
        }
    }

    // configure the mqtt client
    c.configureMqttConnection()

    // start the connection routine
    logger.Printf("MQTTConnector.start() Will connect to the broker %v\n", c.config.URL)
    go c.connect(0)

    // start the publisher routine
    go c.publisher()
}

// reads outgoing messages from the pubCh und publishes them to the broker
func (c *MQTTConnector) publisher() {
    for resp := range c.pubCh {
        if !c.client.IsConnected() {
            logger.Println("MQTTConnector.publisher() got data while not connected to the broker. **discarded**")
            continue
        }
        if resp.IsError {
            logger.Println("MQTTConnector.publisher() data ERROR from agent manager:", string(resp.Payload))
            continue
        }
        topic := c.pubTopics[resp.ResourceId]
        c.client.Publish(topic, byte(defaultQoS), false, resp.Payload)
        // We dont' wait for confirmation from broker (avoid blocking here!)
        //<-r
        logger.Println("MQTTConnector.publisher() published to", topic)
    }
}


func (c *MQTTConnector) stop() {
    logger.Println("MQTTConnector.stop()")
    if c.client != nil && c.client.IsConnected() {
        c.client.Disconnect(500)
    }
}

func (c *MQTTConnector) connect(backOff int) {
    if c.client == nil {
        logger.Printf("MQTTConnector.connect() client is not configured")
        return
    }
    for {
        logger.Printf("MQTTConnector.connect() connecting to the broker %v, backOff: %v sec\n", c.config.URL, backOff)
        time.Sleep(time.Duration(backOff) * time.Second)
        if c.client.IsConnected() {
            break
        }
        token := c.client.Connect()
        token.Wait()
        if token.Error() == nil {
            break
        }
        logger.Printf("MQTTConnector.connect() failed to connect: %v\n", token.Error().Error())
        if backOff == 0 {
            backOff = 10
        } else if backOff <= 600 {
            backOff *= 2
        }
    }

    logger.Printf("MQTTConnector.connect() connected to the broker %v", c.config.URL)
    return
}

func (c *MQTTConnector) onConnected(client *MQTT.Client) {
    // subscribe if there is at least one resource with SUB in MQTT protocol is configured
    if len(c.subTopicsRvsd) > 0 {
        logger.Println("MQTTPulbisher.onConnected() will (re-)subscribe to all configured SUB topics")

        topicFilters := make(map[string]byte)
        for topic, _ := range c.subTopicsRvsd {
            logger.Printf("MQTTPulbisher.onConnected() will subscribe to topic %s", topic)
            topicFilters[topic] = defaultQoS
        }
        client.SubscribeMultiple(topicFilters, c.messageHandler)
    } else {
        logger.Println("MQTTPulbisher.onConnected() no resources with SUB configured")
    }
}

func (c *MQTTConnector) onConnectionLost(client *MQTT.Client, reason error) {
    logger.Println("MQTTPulbisher.onConnectionLost() lost connection to the broker: ", reason.Error())

    // Initialize a new client and reconnect
    c.configureMqttConnection()
    go c.connect(0)
}

func (c *MQTTConnector) configureMqttConnection() {
    connOpts := MQTT.NewClientOptions().
        AddBroker(c.config.URL).
        SetClientID(c.clientID).
        SetCleanSession(true).
        SetConnectionLostHandler(c.onConnectionLost).
        SetOnConnectHandler(c.onConnected).
        SetAutoReconnect(false) // we take care of re-connect ourselves

    // Username/password authentication
    if c.config.Username != "" && c.config.Password != "" {
        connOpts.SetUsername(c.config.Username)
        connOpts.SetPassword(c.config.Password)
    }

    // SSL/TLS
    if strings.HasPrefix(c.config.URL, "ssl") {
        tlsConfig := &tls.Config{}
        // Custom CA to auth broker with a self-signed certificate
        if c.config.CaFile != "" {
            caFile, err := ioutil.ReadFile(c.config.CaFile)
            if err != nil {
                logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to read CA file %s:%s\n", c.config.CaFile, err.Error())
            } else {
                tlsConfig.RootCAs = x509.NewCertPool()
                ok := tlsConfig.RootCAs.AppendCertsFromPEM(caFile)
                if !ok {
                    logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to parse CA certificate %s\n", c.config.CaFile)
                }
            }
        }
        // Certificate-based client authentication
        if c.config.CertFile != "" && c.config.KeyFile != "" {
            cert, err := tls.LoadX509KeyPair(c.config.CertFile, c.config.KeyFile)
            if err != nil {
                logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to load client TLS credentials: %s\n",
                    err.Error())
            } else {
                tlsConfig.Certificates = []tls.Certificate{cert}
            }
        }

        connOpts.SetTLSConfig(tlsConfig)
    }

    c.client = MQTT.NewClient(connOpts)
}

Этот код делает именно то, что я ищу!

Но, как нуб на Голанге, я не могу понять, как запустить START() функция внутри моей основной функции и какой аргумент для передачи!

И особенно, как я буду обрабатывать передачу сообщений работнику (издателю) по каналу?!

Ваша помощь будет оценена по достоинству!

Ответы [ 2 ]

1 голос
/ 06 октября 2019

Я разместил ответ ниже на репозитории github , но, поскольку вы задали тот же вопрос, я подумал, что его стоит опубликовать (с дополнительной информацией).

Когда вы говорите «массовая передача сообщений при выполнении задач с высокой задержкой», я предполагаю, что вы имеете в виду, что вы хотите отправлять сообщения асинхронно (поэтому сообщение обрабатывается другой подпрограммой go, чем выполняется ваш основной код).

Если это так, то очень простое изменение вашего первоначального примера даст вам следующее:

for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        token := c.Publish("logs", 0, false, text)
        // comment out... token.Wait()
    }

Примечание. Код вашего примера может завершиться до того, как сообщения будут на самом делепослал;добавление time.Sleep (10 * time.Second) дало бы ему время, чтобы они вышли;см. код ниже для другого способа обработки этого

Единственная причина, по которой ваш исходный код остановился до отправки сообщения, заключалась в том, что вы вызвали token.Wait (). Если вас не волнуют ошибки (и вы не проверяете их, поэтому я полагаю, что вас это не волнует), тогда нет смысла вызывать token.Wait () (он просто ждет, пока сообщение отправлено; сообщение исчезнет)вызываете ли вы token.Wait () или нет).

Если вы хотите регистрировать любые ошибки, вы можете использовать что-то вроде:

for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        token := c.Publish("logs", 0, false, text)
        go func(){
            token.Wait()
            err := token.Error()
            if err != nil {
                fmt.Printf("Error: %s\n", err.Error()) // or whatever you want to do with your error
            }
        }()
    }

Обратите внимание, что есть еще несколько вещей, которые вынужно сделать, если доставка сообщения критична (но поскольку вы не проверяете ошибки, я предполагаю, что это не так).

С точки зрения кода, который вы нашли;Я подозреваю, что это добавит сложности, которая вам не нужна (и для ее решения потребуется дополнительная информация; например, структура MqttProtocol не определена в вставленном вами бите).

Дополнительный бит ... В вашемкомментарии, которые вы упомянули "Опубликованные сообщения должны быть заказаны". Если это важно (поэтому вы хотите подождать, пока каждое сообщение не будет доставлено, прежде чем отправлять другое), вам нужно что-то вроде:

msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
var wg sync.WaitGroup
wg.Add(1)
go func(){ // go routine to send messages from channel
    for msg := range msgChan {
        token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
        token.Wait()
        // should check for errors here
    }
    wg.Done()
}()

for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        msgChan <- text
    }
close(msgChan) // this will stop the goroutine (when all messages processed)
wg.Wait() // Wait for all messages to be sent before exiting (may wait for ever is mqtt broker down!)

Примечание: это похоже на решение от Ильи Казначеева (если вы установите параметру workerPoolSize значение 1 и сделаете канал буферизованным)

Поскольку ваши комментарии указывают на то, что группа ожидания делает это трудным для понимания, это еще один способ ожидания, который может быть более понятным (группы ожидания обычно используются, когдавы ожидаете, что несколько вещей станут финскими; в этом примере мы ждем только одного, поэтому можно использовать более простой подход)

msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
done := make(chan struct{}) // channel used to indicate when go routine has finnished

go func(){ // go routine to send messages from channel
    for msg := range msgChan {
        token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
        token.Wait()
        // should check for errors here
    }
    close(done) // let main routine know we have finnished
}()

for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        msgChan <- text
    }
close(msgChan) // this will stop the goroutine (when all messages processed)
<-done // wait for publish go routine to complete
0 голосов
/ 03 октября 2019

Почему бы вам просто не разделить отправку сообщений на несколько рабочих?

Примерно так:

...
    const workerPoolSize = 10 // the number of workers you want to have
    wg := &sync.WaitGroup{}
    wCh := make(chan string)
    wg.Add(workerPoolSize) // you want to wait for 10 workers to finish the job

    // run workers in goroutines
    for i := 0; i < workerPoolSize; i++ {
        go func(wch <-chan string) {
            // get the data from the channel
            for text := range wch {
                c.Publish("logs", 0, false, text)
                token.Wait()
            }
            wg.Done() // worker says that he finishes the job
        }(wCh)
    }

    for i := 0; i < 5; i++ {
        // put the data to the channel
        wCh <- fmt.Sprintf("this is msg #%d!", i)
    }

        close(wCh)
    wg.Wait() // wait for all workers to finish
...
...