Обрабатывать несколько сообщений amqp одновременно через одного потребителя внутри одной службы Spring-Rabbit - PullRequest
0 голосов
/ 22 января 2020

РЕДАКТИРОВАТЬ

Только что узнал, как запустить несколько потребителей в одном сервисе:

  @Bean
  SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(RENDER_QUEUE);
    container.setConcurrentConsumers(concurrentConsumers); // setting this in env
    container.setMessageListener(listenerAdapter);
    return container;
  }

  @Bean
  MessageListenerAdapter listenerAdapter(RenderMessageConsumer receiver) {
    return new MessageListenerAdapter(receiver, "reciveMessageFromRenderQueue");
  }

Теперь остается только один вопрос: как я могу иметь глобальный лимит? Итак, как несколько экземпляров приемника AMQP делят общее количество потребителей? Поэтому я хочу установить глобальное число concurrentConsumers равным 10, запустить 2 экземпляра consumerSerivce и каждый экземпляр запустить около 5 потребителей. Может ли это управляться rabbitMq?


У меня есть служба Spring, которая потребляет сообщения AMQP и вызывает ресурс http для каждого сообщения. После завершения http-вызова вызывается другая очередь, сообщающая об ошибке или выполненная. Только после этого обработка сообщения будет завершена, и следующее сообщение будет извлечено из очереди.

  // simplified
  @RabbitListener(queues = RENDER_QUEUE)
  public void reciveMessageFromRenderQueue(String message) {
    try {
      RenderMessage renderMessage = JsonUtils.stringToObject(message, RenderMessage.class);
      String result = renderService.httpCallRenderer(renderMessage);
      messageProducer.sendDoneMessage(result);
    } catch (Exception e) {
      logError(type, e);
      messageProducer.sendErrorMessage(e.getMessage());
    }
  }

Иногда в очереди появляются сотни или тысячи сообщений рендеринга, но http-вызов выполняется довольно долго и мало что делает. Это становится очевидным, поскольку я могу улучшить скорость обработки сообщений, запустив несколько экземпляров службы, добавив тем самым больше потребителей и несколько раз вызывая конечную точку http. Один экземпляр имеет ровно одного потребителя для канала, поэтому количество экземпляров равно количеству потребителей. Однако это сильно увеличивает использование памяти (поскольку служба использует spring) для простой пересылки сообщения и обработки результата.

Поэтому я подумал, что я бы сделал http-вызов асинхронно и сразу же вернулся бы после принятия сообщения:

.httpCallRendererAsync(renderMessage)
    .subscribeOn(Schedulers.newThread())
    .subscribe(new Observer<String >() {
      public void onNext(String result) {
        messageProducer.sendDoneMessage(result);
      }
      public void onError(Throwable throwable) {
        messageProducer.sendErrorMessage(throwable.getMessage());
      }
    });

Это, однако, перегружает конечную точку http, которая не может обработать 1000 или более одновременных запросов.

Мне нужно, чтобы моя служба amqp принимала определенное количество сообщений из очереди, обрабатывать их в отдельных потоках, сделайте http-вызов в каждом из них и вернитесь с «обработано сообщение». Однако количество сообщений, взятых из очереди, необходимо разделить между несколькими экземплярами этой службы, поэтому, если максимальное значение равно 10, потребление сообщений выполняется в циклическом порядке, первые 5 нечетных сообщений должны обрабатываться первым экземпляром, а первые 5 четных сообщений - экземпляр 2, и как только один экземпляр заканчивает обработку сообщения, он должен взять другой из очереди.

Я обнаружил такие вещи, как предварительная выборка с ограничениями по потребителю и по каналу , как описано в rabbitmq . И реализация Spring-Rabbit, которая использует prefetchCount и транзакциюSize , описанную здесь . Это, однако, похоже, ничего не делает для одного работающего экземпляра. Он не будет создавать дополнительные потоки для одновременной обработки большего количества сообщений. И, конечно, это не уменьшит количество сообщений, обрабатываемых в моем сценарии asyn c, так как эти сообщения сразу считаются «обработанными».

  @Bean
  public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchContainerFactory(ConnectionFactory rabbitConnectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory);
    factory.setPrefetchCount(5);
    factory.setTxSize(5);
    return factory;
  }

  // and then using
  @RabbitListener(queues = RENDER_QUEUE, containerFactory = "prefetchContainerFactory")

Наиболее важным требованием для меня, как мне кажется, является то, что несколько сообщений должен обрабатываться в одном экземпляре, в то время как максимальное количество одновременно обрабатываемых сообщений должно делиться между экземплярами. Можно ли это сделать с помощью rabbitMq и spring? Или я должен реализовать что-то среднее между ними.

На ранней стадии может быть приемлемым просто иметь одновременную обработку сообщений в одном экземпляре и не разделять это ограничение. Затем мне нужно будет настроить предел вручную, используя переменные окружения при масштабировании количества экземпляров.

1 Ответ

0 голосов
/ 22 января 2020

Теперь остается только один вопрос: как я могу иметь глобальный лимит? Итак, как несколько экземпляров приемника AMQP делят общее количество потребителей? Поэтому я хочу установить глобальное число concurrentConsumers равным 10, запустить 2 экземпляра consumerSerivce и каждый экземпляр запустить около 5 потребителей. Может ли это быть выполнено rabbitMq?

В RabbitMQ или Spring нет механизма для автоматической поддержки такого сценария. Однако вы можете изменить параллелизм во время выполнения (setConcurrentConsumers() для контейнера), чтобы использовать внешний агент для управления параллелизмом в каждом экземпляре.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...