Подписка на сообщения MQTT с помощью Goroutine - PullRequest
0 голосов
/ 28 июня 2018

В настоящее время у меня есть код Go, на который можно подписаться и распечатать данные датчика, опубликованные в определенной теме. Вот мой код:

package main

import (
    "crypto/tls"
    "flag"
    "fmt"
    //"log"
    "os"
    "os/signal"
    "strconv"
    "syscall"
    "time"
    MQTT "github.com/eclipse/paho.mqtt.golang"
)

func onMessageReceived(client MQTT.Client, message MQTT.Message) {
    //fmt.Printf("Received message on topic: %s\nMessage: %s\n", message.Topic(), message.Payload())
    fmt.Printf("%s\n", message.Payload())
}

func main() {
    //MQTT.DEBUG = log.New(os.Stdout, "", 0)
    //MQTT.ERROR = log.New(os.Stdout, "", 0)
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    hostname, _ := os.Hostname()

    server := flag.String("server", "tcp://test.mosquitto.org:1883", "The full url of the MQTT server to connect to ex: tcp://127.0.0.1:1883")
    topic := flag.String("topic", "topic/sensorTemperature", "Topic to subscribe to")


    qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
    clientid := flag.String("clientid", hostname+strconv.Itoa(time.Now().Second()), "A clientid for the connection")
    username := flag.String("username", "", "A username to authenticate to the MQTT server")
    password := flag.String("password", "", "Password to match username")
    flag.Parse()

    connOpts := MQTT.NewClientOptions().AddBroker(*server).SetClientID(*clientid).SetCleanSession(true)
    if *username != "" {
        connOpts.SetUsername(*username)
        if *password != "" {
            connOpts.SetPassword(*password)
        }
    }
    tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
    connOpts.SetTLSConfig(tlsConfig)

    connOpts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil {
            panic(token.Error())
        }
    }

    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    } else {
        fmt.Printf("Connected to %s\n", *server)
    }

    <-c
}

Вместо подписки на подобные сообщения я хочу поместить часть кода, которая подписывается в Goroutine . Я хочу иметь возможность звонить go func onMessageReceived. Как я могу это сделать, если эта функция вызывается в c.Subscribe? И как я могу добавить параметр sync.WaitGroup в? Спасибо.

1 Ответ

0 голосов
/ 28 июня 2018

Поскольку вы передаете функцию в качестве параметра другой функции, вы не можете контролировать способ ее вызова. Тем не менее, у вас есть полный контроль над тем, что происходит внутри функции - это означает, что вы можете запустить там программу:

func onMessageReceived(client MQTT.Client, message MQTT.Message) {
    go func() {
        fmt.Printf("%s\n", message.Payload())
    }()
}

Таким образом, onMessageReceived сам по-прежнему будет вызываться MQTT синхронно, но он просто запустит программу и немедленно вернется. Вы также можете определить отдельную функцию и вызвать ее с помощью go вместо анонимной функции:

func onMessageReceived(client MQTT.Client, message MQTT.Message) {
    go messageHandler(client, message)
}

func messageHandler(client MQTT.Client, message MQTT.Message) {
    fmt.Printf("%s\n", message.Payload())
}

Это просто вопрос того, как вы хотите организовать свой код. Если это короткий обработчик, я бы, вероятно, придерживался анонимной функции (достаточно короткой, чтобы вы могли видеть весь анонимный функционал на одном экране); для более длинной функции я бы разбил ее или разделил на именованную функцию.

Поскольку вы не можете передавать какие-либо дополнительные параметры, если вы хотите использовать WaitGroup, он должен быть глобальным:

var wg = new(sync.WaitGroup)

func onMessageReceived(client MQTT.Client, message MQTT.Message) {
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Printf("%s\n", message.Payload())
    }()
}
...