У меня есть две очереди SQS: одна предназначена для сообщений с низким приоритетом, а другая - для сообщений с высоким приоритетом. Логика c предназначена для того, чтобы не трогать сообщения в очереди с низким приоритетом, если очередь с высоким приоритетом не пуста.
SuspendLowPriorityRoutePolicy suspendLowPriorityRoutePolicy = new SuspendLowPriorityRoutePolicy(LOW_PRIORITY_ROUTE_ID, camelContext);
from(UriBuilder.buildSqsUri(sqsProperties)).routeId(LOW_PRIORITY_ROUTE_ID)
.log(LoggingLevel.INFO, log, "NON-PRIORITY: ${body}");
from(UriBuilder.buildPrioritySqsUri(sqsProperties)).routeId(HIGH_PRIORITY_ROUTE_ID)
.routePolicy(suspendLowPriorityRoutePolicy)
.log(LoggingLevel.INFO, log, "PRIORITY: ${body}");
Оба этих пользователя имеют свойство concurrentConsumers
, установленное в 1, что означает, что они будут обрабатывать одно сообщение за раз.
Прямо сейчас у меня есть эти два маршрута, настроенные на потреблять сообщения из очередей одновременно. Я хотел бы, чтобы сообщение, поступающее на высокоприоритетный маршрут, инициировало прекращение низкоприоритетного маршрута. Чтобы попытаться получить эту функцию, я попытался использовать политику маршрутизации, которая останавливает очередь с низким приоритетом при запуске нового обмена на маршруте с высоким приоритетом:
(фрагмент из SuspendLowPriorityRoutePolicy
)
@Override
public void onExchangeBegin(Route route, Exchange exchange) {
Route lowPriorityRoute = context.getRoute(lowPriorityRouteId);
ServiceStatus routeStatus = context.getRouteStatus(lowPriorityRouteId);
if (!routeStatus.isStopped()) {
try {
lock.lock();
log.info("High priority request came in, stopping consumer");
stopConsumer(lowPriorityRoute.getConsumer());
} catch (Exception e) {
log.error("Exception stopping consumer " + e);
handleException(e);
} finally {
lock.unlock();
}
}
}
Однако я не уверен, как go о перезапуске потребителя с низким приоритетом. Другие ловушки, предоставляемые Camel RoutePolicy
, позволяют переопределять onExchangeDone
, но в этом случае logi c должен перезапускать потребителя с низким приоритетом, только если очереди с высоким приоритетом пусты. Я не думаю, что есть способ проверить, пуста ли очередь, мы могли бы проверить атрибут ApproximateNumberOfMessages
в обработчике обмена, но это может быть неточным.
Другая мысль могла бы иметь запланированный фон модуль проверки, который проверяет inFlightRequestsRepository CamelContext
и перезапускает очереди с низким приоритетом, только если маршрут с высоким приоритетом не имеет внутренних запросов.