golang rabbitmq channel.consume SIGSEGV - PullRequest
       11

golang rabbitmq channel.consume SIGSEGV

0 голосов
/ 14 октября 2019

Народ, я пытаюсь написать библиотеку оболочки для реализации rabbitmq. По какой-то странной причине я не могу использовать существующую очередь.

msgs, err := w.AMQP.Channel.Consume( вызывает:

2019-10-14T13:58:56.462-0400    info    worker/worker.go:27     [Initializing NewWorker]
2019-10-14T13:58:56.462-0400    debug   worker/worker.go:43     Starting Worker
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x28 pc=0x14d024e]

Пример документов , который я использовал, прости интерфейс к драйверу rabbitmq прост. Я не понимаю, почему я получаю invalid memory address ошибку.

Моя реализация идентична тому, как это изложено в статье, для справки, вот весь код, который у меня есть:

// StartConsumingTlcFHVDrivers subscribes to queue and starts to do it's job
func (w *Worker) StartConsuming() {
    queueName := w.options.Rabbit.AMQP.QueueName
    w.logger.Debugf("Starting %s Worker", queueName)

    msgs, err := w.AMQP.Channel.Consume(
        queueName,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        logger.Fatalf("Could not register consumer, err:%s", err)
    }

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            w.logger.Infof("Received a message: %s", d.Body)
        }
    }()

    w.logger.Infof("Waiting for messages on %s", queueName)
    <-forever
}

Прежде чем я закончу обертку с моим собственным интерфейсом, я настраиваю клиента следующим образом:

import ""github.com/streadway/amqp""

type Client struct {
    options Options
    logger  ilogger.Logger
    metrics imetrics.Client
    Channel amqp.Channel
}

func NewRabbitClient(logger ilogger.Logger, metrics imetrics.Client, options Options) *Client {
    var err error
    conn, err := amqp.Dial(options.AMQPConnectionURL)
    if err != nil {
        logger.Fatalf("%s: %s", "Can't connect to AMQP", err)
    }
    defer conn.Close()

    // create a dedicated channel per queue
    channel, err := conn.Channel()
    if err != nil {
        logger.Fatalf("%s: %s", "Can't create a amqpChannel", err)
    }
    defer channel.Close()

    // declare an Exchange
    err = channel.ExchangeDeclare(options.ExchangeName, options.ExchangeType, true, false, false, false, nil)

    // declare a Queue
    _, err = channel.QueueDeclare(options.QueueName, true, false, false, false, nil)
    if err != nil {
        logger.Fatalf("Could not declare %s queue, err:%s", options.QueueName, err)
    }

    // bind a Queue to the Exchange
    err = channel.QueueBind(options.QueueName, "", options.ExchangeName, false, nil)

    // Qos on the Channel
    err = channel.Qos(1, 0, false)
    if err != nil {
        logger.Fatalf("Could not configure QoS, err:%s", err)
    }

    return &Client{
        metrics: metrics,
        logger:  logger,
        options: options,
        Channel: *channel,
    }
}

1 Ответ

0 голосов
/ 14 октября 2019

Ошибка теперь очевидна для меня. В функции конструктора не закрывайте соединение / канал. т.е.

    conn, err := amqp.Dial(options.AMQPConnectionURL)
    if err != nil {
        logger.Fatalf("%s: %s", "Can't connect to AMQP", err)
    }

вместо

    conn, err := amqp.Dial(options.AMQPConnectionURL)
    if err != nil {
        logger.Fatalf("%s: %s", "Can't connect to AMQP", err)
    }
    defer conn.Close()
    ```
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...