Правильный ли Go / RabbitMQ способ «выкинуть» одно сообщение из очереди? - PullRequest
0 голосов
/ 08 декабря 2018

Первый вопрос, который у меня есть, это действительно вопрос дизайна.Я впервые пишу сервис, использующий очередь, и я также новичок в Go.Я пытаюсь определить, должен ли я писать своему работнику так, чтобы он просто выталкивал одно сообщение из очереди, обрабатывал его и умирал.С такими вещами, как Kubernetes, это кажется довольно тривиальным.

Или мне нужно, чтобы долгоживущий работник постоянно ждал новых сообщений, но он возобновляется, если он умирает (из-за ошибки или несчастного случая)?

Причина, по которой я задаю этот вопрос, заключается в том, что для того, чтобы реализовать первое, он выглядит немного более "взломанным", потому что я должен написать следующее, используя библиотеку common go AMQP из streadway/amqp (см. Комментарии):

// Pop will extract a message from the AMQP queue
func (v *Queue) Pop() (data []byte, err error) {
    msgs, err := v.Channel.Consume(
        v.QueueName, // queue
        v.ConsmerID, // consumer
        true,        // auto-ack
        false,       // exclusive
        false,       // no-local
        false,       // no-wait
        nil,         // args
    )
    if err != nil {
        return nil, err
    }

    // We have to use for .. range because Consume returns
    // "<-chan Delivery" but if we only want ONE message popped off
    // we return on the first one
    for data := range msgs {
        return data.Body, nil
    }

    // We should never get this far...
    return nil, errors.New("Something went wrong")
}

Кроме того, что в данном случае означает <-chan Delivery?Кажется, это своего рода «поток» или объект, к которому вы можете подключиться.Есть ли способ не писать цикл for для этих типов данных?

РЕДАКТИРОВАТЬ: я также обнаружил, что кажется, что этот код будет очередь из очереди ВСЕ, даже если он только делает цикл forитерация один раз (как показано в коде выше).Я тоже не уверен, почему это происходит?

Соответствующие ссылки на код:

1 Ответ

0 голосов
/ 08 декабря 2018

Чтобы просто взять один объект из <-chan Delivery, не используйте цикл range, а оператор канала <-:

data := <- msgs
return data.Body, nil

Что касаетсяпочему вся ваша очередь очищается, как только вы получаете одно сообщение: это, скорее всего, из-за предварительной выборки Consumer .При использовании сообщений клиент на самом деле извлекает их не из брокера один за другим, а партиями настраиваемого размера (если я правильно помню, порядка 32 или 64 сообщений по умолчанию).Как только брокер опубликует этот пакет сообщений для вашего потребителя, они будут в вашем msgs канале;и если вы больше не читаете с этого канала после получения первого сообщения, остальные из них исчезнут (по крайней мере, с включенным auto-ack - в противном случае они будут поставлены в очередь после закрытия канала).

Чтобы получать только одно сообщение за раз, используйте функцию QoS канала (с первым параметром, являющимся счетчиком предварительной выборки):

err := v.Channel.Qos(1, 0, false)
...