Общий способ обработки нескольких очередей с помощью библиотеки streadway - PullRequest
0 голосов
/ 11 января 2019

Я смотрю на обработку тысяч сообщений, поступающих из нескольких очередей RabbitMQ (от 5 до 10), и отправляю обработанные сообщения пакетным способом в ELK.

Каков наилучший общий способ обработки n очередей с помощью библиотеки streadway / amqp?

Что именно должно быть включено в каждую программу, в терминах amqp.Connection, amqp.Channel и amqp.Consume r.

В основном я вижу 3 дизайна:

A) 1 соединение - 1 канал - n потребителей

B) 1 соединение - n каналов - 1 потребитель

C) n Подключение - 1 канал - 1 Потребитель

А) у меня не работает:

Failed to register a consumer: Exception (504) Reason: "channel/connection is not open"

Каждая из этих программ будет затем буферизовать x сообщений и делать BatchRequest для ELK независимо от других.

Пока что запуск 1 соединения в очередь (C), кажется, работает, даже если мне приходится иметь дело с высоким потреблением памяти сервером. Это действительно самый эффективный дизайн или мне нужно сохранить 1 соединение на одного работника, обрабатывающего все 5-10 каналов?

Вот (С) с одним соединением на очередь.

func main() {
    queues := []string{"q1", "q2", "q3", "q4"}

    forever := make(chan bool)
    for _, queue := range queues {
        go processQueue(queue)
    }
    <-forever
}

func processQueue(name string) {
    conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
    defer conn.Close()

    ch, _ := conn.Channel()
    defer ch.Close()

    msgs, _ := ch.Consume(name, "test-dev", false, false, false, false, nil)
    go func() {
        for d := range msgs {
            log.Printf("[%s] %s", name, d.RoutingKey)
            d.Ack(true)
        }
    }()
}
...