Я смотрю на обработку тысяч сообщений, поступающих из нескольких очередей RabbitMQ (от 5 до 10), и отправляю обработанные сообщения пакетным способом в ELK.
Каков наилучший общий способ обработки n
очередей с помощью библиотеки streadway / amqp?
Что именно должно быть включено в каждую программу, в терминах amqp.Connection
, amqp.Channel
и amqp.Consume
r.
В основном я вижу 3 дизайна:
A) 1 соединение - 1 канал - n потребителей
B) 1 соединение - n каналов - 1 потребитель
C) n Подключение - 1 канал - 1 Потребитель
А) у меня не работает:
Failed to register a consumer: Exception (504) Reason: "channel/connection is not open"
Каждая из этих программ будет затем буферизовать x
сообщений и делать BatchRequest для ELK независимо от других.
Пока что запуск 1 соединения в очередь (C), кажется, работает, даже если мне приходится иметь дело с высоким потреблением памяти сервером. Это действительно самый эффективный дизайн или мне нужно сохранить 1 соединение на одного работника, обрабатывающего все 5-10 каналов?
Вот (С) с одним соединением на очередь.
func main() {
queues := []string{"q1", "q2", "q3", "q4"}
forever := make(chan bool)
for _, queue := range queues {
go processQueue(queue)
}
<-forever
}
func processQueue(name string) {
conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
msgs, _ := ch.Consume(name, "test-dev", false, false, false, false, nil)
go func() {
for d := range msgs {
log.Printf("[%s] %s", name, d.RoutingKey)
d.Ack(true)
}
}()
}