Я разместил ответ ниже на репозитории github , но, поскольку вы задали тот же вопрос, я подумал, что его стоит опубликовать (с дополнительной информацией).
Когда вы говорите «массовая передача сообщений при выполнении задач с высокой задержкой», я предполагаю, что вы имеете в виду, что вы хотите отправлять сообщения асинхронно (поэтому сообщение обрабатывается другой подпрограммой go, чем выполняется ваш основной код).
Если это так, то очень простое изменение вашего первоначального примера даст вам следующее:
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("logs", 0, false, text)
// comment out... token.Wait()
}
Примечание. Код вашего примера может завершиться до того, как сообщения будут на самом делепослал;добавление time.Sleep (10 * time.Second) дало бы ему время, чтобы они вышли;см. код ниже для другого способа обработки этого
Единственная причина, по которой ваш исходный код остановился до отправки сообщения, заключалась в том, что вы вызвали token.Wait (). Если вас не волнуют ошибки (и вы не проверяете их, поэтому я полагаю, что вас это не волнует), тогда нет смысла вызывать token.Wait () (он просто ждет, пока сообщение отправлено; сообщение исчезнет)вызываете ли вы token.Wait () или нет).
Если вы хотите регистрировать любые ошибки, вы можете использовать что-то вроде:
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("logs", 0, false, text)
go func(){
token.Wait()
err := token.Error()
if err != nil {
fmt.Printf("Error: %s\n", err.Error()) // or whatever you want to do with your error
}
}()
}
Обратите внимание, что есть еще несколько вещей, которые вынужно сделать, если доставка сообщения критична (но поскольку вы не проверяете ошибки, я предполагаю, что это не так).
С точки зрения кода, который вы нашли;Я подозреваю, что это добавит сложности, которая вам не нужна (и для ее решения потребуется дополнительная информация; например, структура MqttProtocol не определена в вставленном вами бите).
Дополнительный бит ... В вашемкомментарии, которые вы упомянули "Опубликованные сообщения должны быть заказаны". Если это важно (поэтому вы хотите подождать, пока каждое сообщение не будет доставлено, прежде чем отправлять другое), вам нужно что-то вроде:
msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
var wg sync.WaitGroup
wg.Add(1)
go func(){ // go routine to send messages from channel
for msg := range msgChan {
token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
token.Wait()
// should check for errors here
}
wg.Done()
}()
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
msgChan <- text
}
close(msgChan) // this will stop the goroutine (when all messages processed)
wg.Wait() // Wait for all messages to be sent before exiting (may wait for ever is mqtt broker down!)
Примечание: это похоже на решение от Ильи Казначеева (если вы установите параметру workerPoolSize значение 1 и сделаете канал буферизованным)
Поскольку ваши комментарии указывают на то, что группа ожидания делает это трудным для понимания, это еще один способ ожидания, который может быть более понятным (группы ожидания обычно используются, когдавы ожидаете, что несколько вещей станут финскими; в этом примере мы ждем только одного, поэтому можно использовать более простой подход)
msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
done := make(chan struct{}) // channel used to indicate when go routine has finnished
go func(){ // go routine to send messages from channel
for msg := range msgChan {
token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
token.Wait()
// should check for errors here
}
close(done) // let main routine know we have finnished
}()
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
msgChan <- text
}
close(msgChan) // this will stop the goroutine (when all messages processed)
<-done // wait for publish go routine to complete