Выборочный прием сообщений на основе атрибутов тела сообщения в RabbitMQ - PullRequest
0 голосов
/ 04 марта 2020

Допустим, у меня есть ситуация, когда мне нужно подождать до 1 минуты, чтобы выполнить какое-либо действие.

Если срок его действия истек, попробуйте другое действие.

Мой текущий Предложение решения основано на возможностях RabbitMQ.

Я бы создал следующие ресурсы:

@Bean
DirectExchange exchangeDirect() {
    return new DirectExchange("exchange.direct");
}

@Bean
Queue bufferQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", amqpProperties.getTimeToLive().toMillis());
    args.put("x-dead-letter-exchange", "exchange.direct");
    args.put("x-dead-letter-routing-key", "timedOutQueue");
    return new Queue("buffer.queue", true, false, false, args);
}

@Bean
Queue timedOutQueue() {
    return new Queue("timed.out.queue", true);
}

@Bean
Binding bufferQueueToExchangeDirect() {
    return bind(bufferQueue())
            .to(exchangeDirect())
            .with("buffer.queue");
}

@Bean
Binding timedOutQueueToExchangeDirect() {
    return bind(timedOutQueue())
            .to(exchangeDirect())
            .with("timed.out.queue");
}

Когда я добавляю действие к bufferQueue и не получаю никаких обновлений доставки в течение 1 минуты, этот запрос затем перемещается на timedOutQueue благодаря bufferQueue's TTL.

Я могу присоединить слушателя приложения кролика к timedOutQueue и использовать другое действие.

Когда я добавляю действие к bufferQueue и я получаю подтверждение того, что действие было успешно выполнено, я хотел бы удалить данное событие действия из bufferQueue.

Я не смог найти такую ​​функцию в RabbitMQ, т.е. возможность получать выборочно.

Я также нашел несколько статей, в которых говорится, что выборочное потребление является антипаттерном.

Можно ли выборочно потреблять сообщения из очереди RabbitMQ?

Как правильно реализовать этот метод? такое шаблон в RabbitMQ?

1 Ответ

1 голос
/ 04 марта 2020

В RabbitMQ отсутствует концепция выбора сообщений.

"Правильный" способ для приложения, которое хочет избирательно получать сообщения, - это использовать несколько очередей / ключей маршрутизации с потребителем для каждого указанного c очередь, в которой он проявляет интерес.

Однако нет способа «удалить» сообщение из середины очереди; только голова.

Когда я добавляю действие в bufferQueue и получаю подтверждение того, что действие было успешно выполнено, я хочу удалить данное событие действия из bufferQueue.

Это не имеет смысла для меня; когда сообщение истекло в bufferQueue из-за TTL и было перемещено в timedOutQueue, оно больше не существует в bufferQueue, поэтому удалять нечего.

Также нет механизма для .. .

и я не получаю обновления доставки в течение 1 минуты,

..., поскольку каждое сообщение в очереди является независимым.

Не похоже, что ваше приложение вообще подходит для брокера сообщений.

...