Для воспроизведения ошибки (404) Reason: "NOT_FOUND - no queue
при подписке на rabbitmq очередей я использую следующий код для одновременного объявления и использования очередей:
package main
import (
"fmt"
"log"
"os"
"sync"
"time"
uuid "github.com/satori/go.uuid"
"github.com/streadway/amqp"
)
func exit1(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
rUSER := "bunny"
rPASS := "test"
rHOST := "my-rabbit"
rPORT := "5672"
rVHOST := "hole"
// read from ENV
if e := os.Getenv("RABBITMQ_USER"); e != "" {
rUSER = e
}
if e := os.Getenv("RABBITMQ_PASS"); e != "" {
rPASS = e
}
if e := os.Getenv("RABBITMQ_HOST"); e != "" {
rHOST = e
}
if e := os.Getenv("RABBITMQ_PORT"); e != "" {
rPORT = e
}
if e := os.Getenv("RABBITMQ_VHOST"); e != "" {
rVHOST = e
}
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/%s",
rUSER, rPASS, rHOST, rPORT, rVHOST))
exit1(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
exit1(err, "Failed to open a channel")
defer ch.Close()
// buggy part
args := map[string]interface{}{
"x-message-ttl": int32(3000),
"x-expires": int32(8000), // <-- culprit
}
concurrent := 500
wg := sync.WaitGroup{}
semaphore := make(chan struct{}, concurrent)
for i := 0; i < 1000; i++ {
semaphore <- struct{}{}
wg.Add(1)
go func() {
queueName := fmt.Sprintf("carrot-%s-%s", time.Now().Format("2006-01-02"), uuid.Must(uuid.NewV4()))
fmt.Printf("Creating queue: %s\n", queueName)
defer func() {
<-semaphore
wg.Done()
}()
q, err := ch.QueueDeclare(
queueName,
false, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
args, // arguments
)
exit1(err, "Failed to declare a queue")
// how to measure here time elapsed between ch.Consume is called
_, err = ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
exit1(err, "Failed to register a consumer")
}()
}
wg.Wait()
}
Предоставление дополнительного контекста, кодабудет работать нормально и подписываться на все очереди 1000
при наличии нескольких одновременных клиентов < 100
, но при добавлении большего числа одновременных клиентов возникает своего рода «состояние гонки», и клиенты начинают получать ошибку 404
, это происходит из-заобъявление TTL для очереди, в данном случае 8 секунд:
"x-expires": int32(8000),
Лучшим решением было бы использование исключительных очередей, в противном случае очередь удаляется до того, как клиент сможет ее использовать, но в этом »глючный код, я хотел бы измерить задержку между ch.QueueDeclare
и ch.Consume
.
Клиент в основном делает:
q, err := ch.QueueDeclare(
queueName,
false, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
args, // arguments
)
А затем:
_, err = ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
При наличии нескольких одновременных клиентов после выполнения QueueDeclare
возникает задержка, которая достигает до 8s
секунд до вызова Consume
ииз-за этого ошибка 404
но что и как я мог бы приспособить, приспособить к коду для измерения этой задержки?