Народ, я пытаюсь написать библиотеку оболочки для реализации rabbitmq. По какой-то странной причине я не могу использовать существующую очередь.
msgs, err := w.AMQP.Channel.Consume(
вызывает:
2019-10-14T13:58:56.462-0400 info worker/worker.go:27 [Initializing NewWorker]
2019-10-14T13:58:56.462-0400 debug worker/worker.go:43 Starting Worker
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x28 pc=0x14d024e]
Пример документов , который я использовал, прости интерфейс к драйверу rabbitmq прост. Я не понимаю, почему я получаю invalid memory address
ошибку.
Моя реализация идентична тому, как это изложено в статье, для справки, вот весь код, который у меня есть:
// StartConsumingTlcFHVDrivers subscribes to queue and starts to do it's job
func (w *Worker) StartConsuming() {
queueName := w.options.Rabbit.AMQP.QueueName
w.logger.Debugf("Starting %s Worker", queueName)
msgs, err := w.AMQP.Channel.Consume(
queueName,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
logger.Fatalf("Could not register consumer, err:%s", err)
}
forever := make(chan bool)
go func() {
for d := range msgs {
w.logger.Infof("Received a message: %s", d.Body)
}
}()
w.logger.Infof("Waiting for messages on %s", queueName)
<-forever
}
Прежде чем я закончу обертку с моим собственным интерфейсом, я настраиваю клиента следующим образом:
import ""github.com/streadway/amqp""
type Client struct {
options Options
logger ilogger.Logger
metrics imetrics.Client
Channel amqp.Channel
}
func NewRabbitClient(logger ilogger.Logger, metrics imetrics.Client, options Options) *Client {
var err error
conn, err := amqp.Dial(options.AMQPConnectionURL)
if err != nil {
logger.Fatalf("%s: %s", "Can't connect to AMQP", err)
}
defer conn.Close()
// create a dedicated channel per queue
channel, err := conn.Channel()
if err != nil {
logger.Fatalf("%s: %s", "Can't create a amqpChannel", err)
}
defer channel.Close()
// declare an Exchange
err = channel.ExchangeDeclare(options.ExchangeName, options.ExchangeType, true, false, false, false, nil)
// declare a Queue
_, err = channel.QueueDeclare(options.QueueName, true, false, false, false, nil)
if err != nil {
logger.Fatalf("Could not declare %s queue, err:%s", options.QueueName, err)
}
// bind a Queue to the Exchange
err = channel.QueueBind(options.QueueName, "", options.ExchangeName, false, nil)
// Qos on the Channel
err = channel.Qos(1, 0, false)
if err != nil {
logger.Fatalf("Could not configure QoS, err:%s", err)
}
return &Client{
metrics: metrics,
logger: logger,
options: options,
Channel: *channel,
}
}