Есть ли способ построить не прослушиватель для очереди с помощью файла конфигурации в AMQP - PullRequest
0 голосов
/ 19 октября 2018

Я опубликовал 50К объектов в определенную очередь.У меня есть один слушатель, который выбирает каждый объект и обрабатывает его.Но, очевидно, потребуется больше времени для обработки всех 50 тыс. Объектов.Поэтому я хочу разместить еще 3 слушателя, которые могут параллельно обрабатывать эти объекты.Для этого мне нужно написать еще два класса слушателя?с тем же кодом?это будет дубликат кода.Есть ли какой-нибудь подход, которым мы можем настроить количество желаемых слушателей, чтобы внутренне он создавал экземпляры для того же слушателя, чтобы справиться с нагрузкой? Может ли кто-нибудь помочь мне лучше выстроить еще 3 слушателя для обработки нагрузки, чтобы увеличить обработку.

==== Файл конфигурации Rabbit MQ кусок кода =============

@Bean
    public SubscriberGeneralQueue1 SubscriberGeneralQueue1(){
        return new SubscriberGeneralQueue1();
    }

@Bean
        public SimpleMessageListenerContainer rpcGeneralReplyMessageListenerContainer(ConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter1 ) {
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            simpleMessageListenerContainer.setQueues(replyQueueRPC());
            simpleMessageListenerContainer.setTaskExecutor(taskExecutor());
            simpleMessageListenerContainer.setMessageListener(listenerAdapter1);
            simpleMessageListenerContainer.setMaxConcurrentConsumers(60);
            return simpleMessageListenerContainer;
        }
       @Bean
        @Qualifier("listenerAdapter1")
        MessageListenerAdapter listenerAdapter1(SubscriberGeneralQueue1 generalReceiver) {
            return new MessageListenerAdapter(generalReceiver, "receivegeneralQueueMessage");
        }

=== Код слушателя ================

@EnableRabbit
public class SubscriberGeneralQueue1 {

     /*@Autowired
        @Qualifier("asyncGeneralRabbitTemplate")
    private AsyncRabbitTemplate asyncGeneralRabbitTemplate;*/

    @Autowired
    private ExecutorService executorService;
    @Autowired
    private GeneralProcess generalProcess;

    List <RequestPojo> requestPojoGeneral = new ArrayList<RequestPojo>();

    @RabbitHandler
    @RabbitListener(containerFactory = "simpleMessageListenerContainerFactory", queues ="BulkSolve_GeneralrequestQueue")
    public void subscribeToRequestQueue(@Payload RequestPojo sampleRequestMessage, Message message) throws InterruptedException {

        long startTime=System.currentTimeMillis();

        //requestPojoGeneral.add(sampleRequestMessage);
        //System.out.println("List size issssss:" +requestPojoGeneral.size() );
        //generalProcess.processRequestObjectslist(requestPojoGeneral);
        generalProcess.processRequestObjects(sampleRequestMessage);

        System.out.println("message in general listener is:" + sampleRequestMessage.getDistance());
        System.out.println("Message payload is:" + sampleRequestMessage);
        System.out.println("Message payload1111 is:" + message );

        //return requestPojoGeneral;

    }

}

=== simplemessagelistenercontainerFactory configuration ===========

 @Bean
        public SimpleRabbitListenerContainerFactory simpleMessageListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                                          SimpleRabbitListenerContainerFactoryConfigurer configurer) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setTaskExecutor(taskExecutor());
            factory.setMaxConcurrentConsumers(60);
            configurer.configure(factory, connectionFactory);
            return factory;
        }

==== Предлагаемые изменения =====

@RabbitHandler
    @Async
    @RabbitListener(containerFactory = "simpleMessageListenerContainerFactory", queues ="BulkSolve_GeneralrequestQueue")
    public void subscribeToRequestQueue(@Payload RequestPojo sampleRequestMessage, Message message) throws InterruptedException {

        long startTime=System.currentTimeMillis();

        //requestPojoGeneral.add(sampleRequestMessage);
        //System.out.println("List size issssss:" +requestPojoGeneral.size() );
        //generalProcess.processRequestObjectslist(requestPojoGeneral);
        generalProcess.processRequestObjects(sampleRequestMessage);

        System.out.println("message in general listener is:" + sampleRequestMessage.getDistance());
        System.out.println("Message payload is:" + sampleRequestMessage);
        System.out.println("Message payload1111 is:" + message );

        //return requestPojoGeneral;

    }


}

конфигурация:

@Bean
        public SimpleRabbitListenerContainerFactory simpleMessageListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                                          SimpleRabbitListenerContainerFactoryConfigurer configurer) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setTaskExecutor(taskExecutor());
            factory.setMaxConcurrentConsumers(60);
            factory.setConsecutiveActiveTrigger(1);
            configurer.configure(factory, connectionFactory);
            return factory;
        }

  @Bean
        public SimpleMessageListenerContainer rpcGeneralReplyMessageListenerContainer(ConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter1 ) {
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            simpleMessageListenerContainer.setQueues(replyQueueRPC());
            simpleMessageListenerContainer.setTaskExecutor(taskExecutor());
            simpleMessageListenerContainer.setMessageListener(listenerAdapter1);
            simpleMessageListenerContainer.setMaxConcurrentConsumers(100);
            simpleMessageListenerContainer.setConsecutiveActiveTrigger(1);
            return simpleMessageListenerContainer;
        }

1 Ответ

0 голосов
/ 19 октября 2018

Это можно сделать с помощью опции concurrency ListenerContainer:

Потоки из TaskExecutor, настроенные в SimpleMessageListenerContainer, используются для вызова MessageListener, когдаНовое сообщение доставлено Клиентом RabbitMQ.Если не настроено, используется SimpleAsyncTaskExecutor.Если используется объединенный исполнитель, убедитесь, что размер пула достаточен для обработки настроенного параллелизма.С DirectMessageListenerContainer MessageListener вызывается непосредственно в потоке клиента RabbitMQ.В этом случае taskExecutor используется для задачи, которая контролирует потребителей.

Пожалуйста, начните читать отсюда: https://docs.spring.io/spring-amqp/docs/current/reference/html/_reference.html#receiving-messages

А также см. Здесь: https://docs.spring.io/spring-amqp/docs/current/reference/html/_reference.html#containerAttributes

concurrentConsumers (параллелизм) - количество одновременных потребителей, которые первоначально запускаются для каждого слушателя.

ОБНОВЛЕНИЕ

Хорошо!Я вижу, что происходит.

У нас есть такой код:

 boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
                    if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
                        if (receivedOk) {
                            if (isActive(this.consumer)) {
                                consecutiveIdles = 0;
                                if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
                                    considerAddingAConsumer();
                                    consecutiveMessages = 0;
                                }
                            }
                        }

, поэтому мы проверяем возможный параллелизм только после обработки первого сообщения.Итак, в вашем случае это произойдет через 1 минуту.

Еще один флаг для considerAddingAConsumer() - это вариант consecutiveActiveTrigger со следующим по умолчанию:

private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10;

Итак,в вашем случае, чтобы разрешить параллелизацию только следующего сообщения, вы также должны настроить:

/**
 * If {@link #maxConcurrentConsumers} is greater then {@link #concurrentConsumers}, and
 * {@link #maxConcurrentConsumers} has not been reached, specifies the number of
 * consecutive cycles when a single consumer was active, in order to consider
 * starting a new consumer. If the consumer goes idle for one cycle, the counter is reset.
 * This is impacted by the {@link #txSize}.
 * Default is 10 consecutive messages.
 * @param consecutiveActiveTrigger The number of consecutive receives to trigger a new consumer.
 * @see #setMaxConcurrentConsumers(int)
 * @see #setStartConsumerMinInterval(long)
 * @see #setTxSize(int)
 */
public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger) {
    Assert.isTrue(consecutiveActiveTrigger > 0, "'consecutiveActiveTrigger' must be > 0");
    this.consecutiveActiveTrigger = consecutiveActiveTrigger;
}

на 1.Потому что 0 не будет работать в любом случае.

Для лучшей производительности вы также можете рассмотреть возможность сделать subscribeToRequestQueue() с @Async, чтобы действительно передать обработку из потока потребителя другому, чтобы избежатьэто 1 минута, чтобы ждать еще одного потребителя, чтобы начать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...