Я использую рекомендующую библиотеку streadyway / amqp для взаимодействия с RabbitMQ внутри Go.
Я хочу, чтобы моя служба не работала, если не может подключиться к RabbitMQ Это означает, что он не может подключиться, ждет n
секунд и пытается восстановить соединение (цикл навсегда).
Я почти уверен, что мне нужно использовать метод Channel.NotifyClose , но я не могу разобраться:
func (ch *Channel) NotifyClose(c chan *Error) chan *Error
NotifyClose регистрирует прослушиватель, когда сервер отправляет исключение канала или соединения в виде метода Connection.Close или Channel.Close. Исключения подключения будут транслироваться на все открытые каналы, и все каналы будут закрыты, а исключения канала будут транслироваться только для слушателей этого канала.
Предоставленный канал будет закрыт при закрытии канала и при корректном закрытии сообщение об ошибке не будет отправлено.
Это то, что я пытался:
graceful := make(chan *amqp.Error)
errs := channel.NotifyClose(graceful)
for {
case <-graceful:
fmt.Println("Graceful close!")
reconnect()
case <-errs:
fmt.Println("Not graceful close")
reconnect()
}
Иногда это работает! В противном случае после повторного подключения он будет многократно распечатывать:
2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
...
Очень быстро.
Я хочу иметь возможность запускать службу в одном терминале, а кролик - в другом. Я должен иметь возможность останавливать и перезапускать кролика в любое время, когда служба постоянно подключается.
Я немного запутался в методе NotifyClose
- c
chan просто закрывается при закрытии соединения? Почему он возвращает другой канал?
Приветствие.
Весь мой код. У него нет функции push
или pop
, потому что это минимальный пример демонстрации повторного подключения при сбое подключения. Реализация push и pop будет зависеть от того, как осуществляется переподключение.
Любые комментарии к обзору кода также приветствуются.
package main
import (
"github.com/streadway/amqp"
"io"
"log"
"sync"
"time"
)
// RabbitMQ ...
type RabbitMQ struct {
Logger *log.Logger
IsConnected bool
addr string
name string
connection *amqp.Connection
channel *amqp.Channel
queue *amqp.Queue
wg *sync.WaitGroup
done chan bool
}
const retryDelay = 5 * time.Second
// NewQueue creates a new queue instance.
func NewQueue(logOut io.Writer, name string, addr string) *RabbitMQ {
rabbit := RabbitMQ{
IsConnected: false,
addr: addr,
name: name,
wg: new(sync.WaitGroup),
done: make(chan bool),
Logger: log.New(logOut, "", log.LstdFlags),
}
rabbit.wg.Add(1)
rabbit.Connect()
go rabbit.reconnect()
return &rabbit
}
// reconnect waits to be notified about a connection
// error, and then attempts to reconnect to RabbitMQ.
func (rabbit *RabbitMQ) reconnect() {
defer rabbit.wg.Done()
graceful := make(chan *amqp.Error)
errs := rabbit.channel.NotifyClose(graceful)
for {
select {
case <-rabbit.done:
return
case <-graceful:
graceful = make(chan *amqp.Error)
rabbit.Logger.Println("Graceful close!")
rabbit.IsConnected = false
rabbit.Connect()
rabbit.IsConnected = true
errs = rabbit.channel.NotifyClose(graceful)
case <-errs:
graceful = make(chan *amqp.Error)
rabbit.Logger.Println("Normal close")
rabbit.IsConnected = false
rabbit.Connect()
errs = rabbit.channel.NotifyClose(graceful)
}
}
}
// Connect will block until a new connection to
// RabbitMQ is formed.
func (rabbit *RabbitMQ) Connect() {
for {
conn, err := amqp.Dial(rabbit.addr)
if err != nil {
rabbit.Logger.Println("Failed to establish connection")
time.Sleep(retryDelay)
continue
}
ch, err := conn.Channel()
if err != nil {
rabbit.Logger.Println("Failed to create a channel")
time.Sleep(retryDelay)
continue
}
queue, err := ch.QueueDeclare(
name,
false, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
rabbit.Logger.Println("Failed to publish a queue")
time.Sleep(retryDelay)
continue
}
rabbit.Logger.Println("Connected!")
rabbit.IsConnected = true
rabbit.connection = conn
rabbit.channel = ch
rabbit.queue = &queue
return
}
}
// Close the connection to RabbitMQ and stop
// checking for reconnections.
func (rabbit *RabbitMQ) Close() error {
close(rabbit.done)
rabbit.wg.Wait()
return rabbit.connection.Close()
}
И как это используется:
package main
import (
"fmt"
"os"
)
const (
name = "job_queue"
addr = "amqp://guest:guest@localhost:5672/"
)
func main() {
fmt.Println("Starting...")
NewQueue(os.Stdout, name, addr)
for {}
}