Элегантный способ настроить высокоприоритетных / низкоприоритетных потребителей маршрутов sqs с Camel RoutePolicy? - PullRequest
0 голосов
/ 14 января 2020

У меня есть две очереди SQS: одна предназначена для сообщений с низким приоритетом, а другая - для сообщений с высоким приоритетом. Логика c предназначена для того, чтобы не трогать сообщения в очереди с низким приоритетом, если очередь с высоким приоритетом не пуста.

SuspendLowPriorityRoutePolicy suspendLowPriorityRoutePolicy = new SuspendLowPriorityRoutePolicy(LOW_PRIORITY_ROUTE_ID, camelContext);

        from(UriBuilder.buildSqsUri(sqsProperties)).routeId(LOW_PRIORITY_ROUTE_ID)
                .log(LoggingLevel.INFO, log, "NON-PRIORITY: ${body}");

        from(UriBuilder.buildPrioritySqsUri(sqsProperties)).routeId(HIGH_PRIORITY_ROUTE_ID)
                .routePolicy(suspendLowPriorityRoutePolicy)
                .log(LoggingLevel.INFO, log, "PRIORITY: ${body}");

Оба этих пользователя имеют свойство concurrentConsumers, установленное в 1, что означает, что они будут обрабатывать одно сообщение за раз.

Прямо сейчас у меня есть эти два маршрута, настроенные на потреблять сообщения из очередей одновременно. Я хотел бы, чтобы сообщение, поступающее на высокоприоритетный маршрут, инициировало прекращение низкоприоритетного маршрута. Чтобы попытаться получить эту функцию, я попытался использовать политику маршрутизации, которая останавливает очередь с низким приоритетом при запуске нового обмена на маршруте с высоким приоритетом:

(фрагмент из SuspendLowPriorityRoutePolicy)

 @Override
    public void onExchangeBegin(Route route, Exchange exchange) {
        Route lowPriorityRoute = context.getRoute(lowPriorityRouteId);
        ServiceStatus routeStatus = context.getRouteStatus(lowPriorityRouteId);
        if (!routeStatus.isStopped()) {
            try {
                lock.lock();
                log.info("High priority request came in, stopping consumer");
                stopConsumer(lowPriorityRoute.getConsumer());
            } catch (Exception e) {
                log.error("Exception stopping consumer " + e);
                handleException(e);
            } finally {
                lock.unlock();
            }
        }
    }

Однако я не уверен, как go о перезапуске потребителя с низким приоритетом. Другие ловушки, предоставляемые Camel RoutePolicy, позволяют переопределять onExchangeDone, но в этом случае logi c должен перезапускать потребителя с низким приоритетом, только если очереди с высоким приоритетом пусты. Я не думаю, что есть способ проверить, пуста ли очередь, мы могли бы проверить атрибут ApproximateNumberOfMessages в обработчике обмена, но это может быть неточным.

Другая мысль могла бы иметь запланированный фон модуль проверки, который проверяет inFlightRequestsRepository CamelContext и перезапускает очереди с низким приоритетом, только если маршрут с высоким приоритетом не имеет внутренних запросов.

1 Ответ

1 голос
/ 16 января 2020

Ну, самый простой способ, который я могу придумать, - это потреблять сообщения с высоким приоритетом гораздо большим количеством потребителей, чем сообщения с низким приоритетом . Таким образом, сообщения с высоким приоритетом обычно потребляются очень быстро, в то время как сообщения с низким приоритетом могут накапливаться и, следовательно, должны ждать.

Однако потребители с низким приоритетом по-прежнему будут обрабатывать сообщения , независимо от того, сколько сообщений с более высоким приоритетом обрабатывается одновременно.


Немного ближе к вашему варианту использования - использование приоритеты JMS .

Вы можете использовать обе очереди одинаково и просто установить различный приоритет JMS для сообщения каждой очереди . Например, стандартный приоритет 4 для сообщений с низким приоритетом и приоритет 8 для сообщений с высоким приоритетом.

Затем вы пересылаете сообщения обоих потребителей в третью очередь . Таким образом, в этой третьей очереди сообщения с высоким и низким приоритетом поступают смешанными, но каждое сообщение с соответствующим установленным приоритетом.

Затем вы присоединяете потребителя к третьей очереди , и из-за приоритетов сообщений сначала должны использоваться сообщения с высоким приоритетом . Сообщения с низким приоритетом потребляются, только если очередь не содержит сообщений с высоким приоритетом.

Однако следует помнить о некоторых вещах:

  • Использование приоритета сообщений изменяет порядок сообщений в очереди по мере необходимости. Это может замедлить работу очереди с большим количеством сообщений.
  • Вероятно, приоритет сообщения должен быть активирован в вашем брокере или очереди . В противном случае приоритеты игнорируются. Найдите пример с вашим брокером, чтобы проверить это.
  • Я не уверен, но я думаю, что есть также некоторые флаги в соединении JMS для соблюдения приоритетов для потребителя
  • Вы должны поддерживать низкий уровень предварительной выборки для потребителя . Если ваш потребитель предварительно выбирает 1000 сообщений с низким приоритетом, он обрабатывает их все, прежде чем делать что-либо еще. Неважно, если в это время в очередь поступает сообщение с высоким приоритетом. Чем больше предварительная выборка, тем больше сообщений с низким приоритетом может быть обработано до обработки вновь поступившего сообщения высокого уровня.
...