Переподключение к RabbitMQ после отключения - PullRequest
0 голосов
/ 04 ноября 2018

Я использую рекомендующую библиотеку streadyway / amqp для взаимодействия с RabbitMQ внутри Go.

Я хочу, чтобы моя служба не работала, если не может подключиться к RabbitMQ Это означает, что он не может подключиться, ждет n секунд и пытается восстановить соединение (цикл навсегда).

Я почти уверен, что мне нужно использовать метод Channel.NotifyClose , но я не могу разобраться:

func (ch *Channel) NotifyClose(c chan *Error) chan *Error

NotifyClose регистрирует прослушиватель, когда сервер отправляет исключение канала или соединения в виде метода Connection.Close или Channel.Close. Исключения подключения будут транслироваться на все открытые каналы, и все каналы будут закрыты, а исключения канала будут транслироваться только для слушателей этого канала.

Предоставленный канал будет закрыт при закрытии канала и при корректном закрытии сообщение об ошибке не будет отправлено.

Это то, что я пытался:

graceful := make(chan *amqp.Error)
errs := channel.NotifyClose(graceful)
for {
    case <-graceful:
        fmt.Println("Graceful close!")
        reconnect()
    case <-errs:
        fmt.Println("Not graceful close")
        reconnect()
}

Иногда это работает! В противном случае после повторного подключения он будет многократно распечатывать:

2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
...

Очень быстро.

Я хочу иметь возможность запускать службу в одном терминале, а кролик - в другом. Я должен иметь возможность останавливать и перезапускать кролика в любое время, когда служба постоянно подключается.

Я немного запутался в методе NotifyClose - c chan просто закрывается при закрытии соединения? Почему он возвращает другой канал?

Приветствие.


Весь мой код. У него нет функции push или pop, потому что это минимальный пример демонстрации повторного подключения при сбое подключения. Реализация push и pop будет зависеть от того, как осуществляется переподключение.

Любые комментарии к обзору кода также приветствуются.

package main

import (
    "github.com/streadway/amqp"
    "io"
    "log"
    "sync"
    "time"
)

// RabbitMQ ...
type RabbitMQ struct {
    Logger      *log.Logger
    IsConnected bool
    addr        string
    name        string
    connection  *amqp.Connection
    channel     *amqp.Channel
    queue       *amqp.Queue
    wg          *sync.WaitGroup
    done        chan bool
}

const retryDelay = 5 * time.Second

// NewQueue creates a new queue instance.
func NewQueue(logOut io.Writer, name string, addr string) *RabbitMQ {
    rabbit := RabbitMQ{
        IsConnected: false,
        addr:        addr,
        name:        name,
        wg:          new(sync.WaitGroup),
        done:        make(chan bool),
        Logger:      log.New(logOut, "", log.LstdFlags),
    }
    rabbit.wg.Add(1)
    rabbit.Connect()
    go rabbit.reconnect()
    return &rabbit
}

// reconnect waits to be notified about a connection
// error, and then attempts to reconnect to RabbitMQ.
func (rabbit *RabbitMQ) reconnect() {
    defer rabbit.wg.Done()
    graceful := make(chan *amqp.Error)
    errs := rabbit.channel.NotifyClose(graceful)
    for {
        select {
        case <-rabbit.done:
            return
        case <-graceful:
            graceful = make(chan *amqp.Error)
            rabbit.Logger.Println("Graceful close!")
            rabbit.IsConnected = false
            rabbit.Connect()
            rabbit.IsConnected = true
            errs = rabbit.channel.NotifyClose(graceful)
        case <-errs:
            graceful = make(chan *amqp.Error)
            rabbit.Logger.Println("Normal close")
            rabbit.IsConnected = false
            rabbit.Connect()
            errs = rabbit.channel.NotifyClose(graceful)
        }
    }
}

// Connect will block until a new connection to
// RabbitMQ is formed.
func (rabbit *RabbitMQ) Connect() {
    for {
        conn, err := amqp.Dial(rabbit.addr)
        if err != nil {
            rabbit.Logger.Println("Failed to establish connection")
            time.Sleep(retryDelay)
            continue
        }
        ch, err := conn.Channel()
        if err != nil {
            rabbit.Logger.Println("Failed to create a channel")
            time.Sleep(retryDelay)
            continue
        }
        queue, err := ch.QueueDeclare(
            name,
            false, // Durable
            false, // Delete when unused
            false, // Exclusive
            false, // No-wait
            nil,   // Arguments
        )
        if err != nil {
            rabbit.Logger.Println("Failed to publish a queue")
            time.Sleep(retryDelay)
            continue
        }
        rabbit.Logger.Println("Connected!")
        rabbit.IsConnected = true
        rabbit.connection = conn
        rabbit.channel = ch
        rabbit.queue = &queue
        return
    }
}

// Close the connection to RabbitMQ and stop
// checking for reconnections.
func (rabbit *RabbitMQ) Close() error {
    close(rabbit.done)
    rabbit.wg.Wait()
    return rabbit.connection.Close()
}

И как это используется:

package main

import (
    "fmt"
    "os"
)

const (
    name = "job_queue"
    addr = "amqp://guest:guest@localhost:5672/"
)

func main() {
    fmt.Println("Starting...")
    NewQueue(os.Stdout, name, addr)
    for {}
}
...