Временная пауза RabbitMQ Потребитель - PullRequest
0 голосов
/ 04 апреля 2019

Я пишу потребитель для RabbitMQ с Go, который должен приостановить потребление сообщений на некоторое время, а затем восстановиться, чтобы получать сообщения из очереди снова.При чтении документации https://godoc.org/github.com/streadway/amqp я не смог определить механизм, который мне нужно реализовать в моем коде.

Возможно ли это сделать?Есть пример?

Фрагмент моего кода:

rabbitMQMessages, err = ch.Consume(
        "TestQ",
        "testConsumer",
        false,
        true,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        select {
        case d := <-rabbitMQMessages: // Cheking if messge was recieved
            log.Printf("Received a message: %s", d.Body)
            dotcount := bytes.Count(d.Body, []byte("."))

            err = ch.Flow(false) // Returns error: Exception (540) Reason: "NOT_IMPLEMENTED - active=false
            failOnError(err, "Failed to close channel")

            t := time.Duration(dotcount)
            time.Sleep(t * time.Second)
            log.Printf("Done")

            err = ch.Flow(true)

            d.Ack(false)
        default:
            log.Println("Default section")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever

Ответы [ 2 ]

0 голосов
/ 06 апреля 2019

Я смог понять это.Мне нужно Close соединение, а затем снова открыть его.Это предотвращает чтение сообщений заранее.Не уверен, что это правильно, но у меня это сработало.Добавление фрагмента моего тестового кода.

func main() {
    var rabbitMQMessages <-chan amqp.Delivery
    var err error
    var rabbitMQ RabbitMQ

    err = rabbitMQ.dial()
    failOnError(err, "Failed to connect to RabbitMQ")
    defer rabbitMQ.Close()

    err = rabbitMQ.setUpChannel()
    failOnError(err, "Failed to open a channel")

    err = rabbitMQ.Consumme()
    failOnError(err, "Failed to consume")

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    rabbitMQMessages = rabbitMQ.GetChan()

    for {
        select {
        case d, ok := <-rabbitMQMessages: // Cheking if messge was recieved
            log.Printf("Chan status at start of function %t", ok)

            if !ok {
                err = rabbitMQ.setUpChannel()
                failOnError(err, "Unable to open channel")
                defer rabbitMQ.Close()

                err = rabbitMQ.Consumme()
                failOnError(err, "Recover. Failed to register a consumer")

                rabbitMQMessages = rabbitMQ.GetChan()

                continue
            }

            log.Printf("Chan status at later of function %t", ok)

            log.Printf("Received a message: %s", d.Body)
            dotcount := bytes.Count(d.Body, []byte("."))
            d.Ack(false)

            err = rabbitMQ.CloseChannel()
            failOnError(err, "Failed to close channel")
            t := time.Duration(dotcount)
            time.Sleep(t * time.Second)
            log.Printf("Done")
        }
    }

}

0 голосов
/ 04 апреля 2019

Вы должны отменить потребителя , а затем повторно запустить ch.Consume в тот момент, когда вы хотите возобновить использование сообщений.


ПРИМЕЧАНИЕ: команда RabbitMQ контролирует список рассылки rabbitmq-users и только иногда отвечает на вопросы в StackOverflow.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...