Go -micro rabbit mq plugin - publi sh сообщение с приоритетом - PullRequest
1 голос
/ 12 марта 2020

Поскольку поддерживаются приоритетные очереди RabbitMQ версии 3.5.0 - https://www.rabbitmq.com/priority.html

Очередь может быть объявлена, если при создании очереди передан аргумент x-max-priority.

Я могу успешно объявить очередь с поддержкой приоритетов

brkrSub := broker.NewSubscribeOptions(
        broker.DisableAutoAck(),
        rabbitmq.QueueArguments(map[string]interface{}{"x-max-priority": 10}),
    )

    service.Server().Subscribe(
        service.Server().NewSubscriber(
            "mytopic",
            h.Handle,
            server.SubscriberQueue("mytopic.hello"),
            server.SubscriberContext(brkrSub.Context),
        ),
    )

Но как мне опубликовать sh сообщение с указанием приоритета?

    body := &message.MyTestMessage{
        Message: fmt.Sprintf("Message number %d", counter),
    }

    msg := client.NewMessage(
        topic,
        body,
        // TODO: Priority
    )
    if err := client.Publish(ctx, msg); err != nil {
        fmt.Printf("Cannot publish message: ", err.Error())
        return
    }

Я не смог найти прямой способ передачи Priority как MessageOption или PublishOption, однако, кажется, что есть способ указать дополнительные параметры в client.Publi sh context. Я смотрю в правильном направлении, и если да, можете ли вы мне немного помочь?

Редактировать : я смог сделать следующее, не вызывая ошибок во время компиляции. Приоритет все еще игнорируется, и сообщения приходят в обычном порядке.


func setPriority(ctx context.Context, priority int) client.PublishOption {
    return func(o *client.PublishOptions) {
        o.Context = context.WithValue(ctx, "priority", priority)
    }
}

func publish(ctx context.Context, priority int, counter int) {
    //body := fmt.Sprintf("hello, I am a message %d", counter)
    body := &message.MyTestMessage{
        Message: fmt.Sprintf("Message number %d", counter),
    }

    msg := client.NewMessage(
        topic,
        body,
    )
    if err := client.Publish(ctx, msg, setPriority(ctx, priority)); err != nil {
        fmt.Printf("Cannot publish message: ", err.Error())
        return
    }

    fmt.Printf("Published message %d to %s \n", counter, topic)
}

1 Ответ

0 голосов
/ 12 марта 2020

Попробуйте что-то подобное:

func publishMessageToChan(queue *amqp.Queue, channel *amqp.Channel, messageToQueue string) error {
    return channel.Publish(
        "<exchange>", // exchange
        "<queue>",    // routing key
        false,        // mandatory
        false,        // immediate
        amqp.Publishing{
            Timestamp:   time.Now(),
            ContentType: "text/plain",
            Body:        []byte(messageToQueue),
            Priority:    0, // <-- Priority here < 0 to 9>
        })
}

С библиотекой "github.com/streadway/amqp"

...