динамично меняются потребители - rabbitMQ - PullRequest
0 голосов
/ 25 апреля 2018

Я пытаюсь динамически менять потребителей. Но в какой-то момент число потребителей выходит за пределы максимальных - потребителей.код:

public class RabbitMQConfig extends AMQPConfig implements RabbitListenerConfigurer{

    @Autowired
    RabbitListenerEndpointRegistry endpoint; 
    static int temp=0;   
    SimpleMessageListenerContainer container;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    SimpleRabbitListenerContainerFactory  factory;

    @Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar){
    registrar.setContainerFactory(rabbitListenerContainerFactory());
    registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}

public AmqpTemplate getAmqpTemplate() {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter ((MessageConverter)consumerJackson2MessageConverter());
    return rabbitTemplate;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() 
{

    factory=new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(this.connectionFactory);
    factory.setConcurrentConsumers(15);
    factory.setMaxConcurrentConsumers(20);
    factory.setPrefetchCount(10);
    return factory;
}

public void  throttleConsumers(){

    Collection<MessageListenerContainer> containers= endpoint.getListenerContainers();
    SimpleMessageListenerContainer  eachSimpleMessageListenerContainer=null;
    for(MessageListenerContainer eachContainer : containers)
    {
    if(eachContainer.getClass().getName().equalsIgnoreCase("org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer")){
    eachSimpleMessageListenerContainer =(SimpleMessageListenerContainer)eachContainer;
    eachSimpleMessageListenerContainer.setConcurrentConsumers(12);
    }               
    }       
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(this.connectionFactory);
    }

}

Потребитель:

@Component
public class Consumer {

@Autowired
private AmqpTemplate rabbitTemplate;

@Value("${rabbitmq.queue}")
String queueName;   

static long messageCount=0L;

@Autowired
RabbitMQConfig config;

    @RabbitListener(queues="rabbitmq.queue")
    public void receivedMessage(String message) throws IOException {

    messageCount++;
    System.out.println(message);

    if(messageCount>10000 && messageCount<15000 )
    {
            System.out.println("consumed: "+messageCount);
            try
            {
                Thread.sleep(1000);
            }
            catch(Exception e)
            {
                System.out.println("error sleeping:"+e.getMessage());
            }

    config.throttleConsumers();
    rabbitTemplate.convertAndSend(queueName,message);
    RestTemplate restTemplate = new RestTemplate();
String quote = restTemplate.getForObject("http://localhost:8080/rabbitmq-consumer/throttle", String.class);
      System.out.println(quote);

    }


}

}

В настоящее время мои минимальные потребители - 15, а максимальные - 20и я уменьшаю до 12, как только время отклика другой программы превышает 800. Пока мы тестируем, число потребителей растет до 29, что мы не можем понять.

Пожалуйста, помогите нам понять, куда мы идемнеправильно.

1 Ответ

0 голосов
/ 27 апреля 2018

Я только что проверил его с похожим кодом и не увидел проблем, если я изменил мин до того, как достиг максимума ...

@SpringBootApplication
public class So50014436Application {

    public static void main(String[] args) {
        SpringApplication.run(So50014436Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, AmqpAdmin admin,
            RabbitListenerEndpointRegistry registry) {
        return args -> {
            for (int i = 0; i < 1000; i++) {
                template.convertAndSend("so50014436", "foo");
            }
            ExecutorService exec = Executors.newSingleThreadExecutor();
            exec.execute(() -> {
                while (true) {
                    System.out.println(admin.getQueueProperties("so50014436"));
                    try {
                        Thread.sleep(5_000);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
            Thread.sleep(30_000);
            System.out.println("Changing min to 12");
            registry.getListenerContainers().forEach(c -> {
                ((SimpleMessageListenerContainer) c).setConcurrentConsumers(12);
            });
            Thread.sleep(60_000);
            exec.shutdownNow();
        };
    }

    @RabbitListener(queues = "so50014436")
    public void listen(String in) throws Exception {
        Thread.sleep(1_000);
    }

    @Bean
    public Queue queue() {
        return new Queue("so50014436");
    }

}

и

spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.simple.max-concurrency=15

и

{QUEUE_CONSUMER_COUNT=10, QUEUE_MESSAGE_COUNT=832, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=10, QUEUE_MESSAGE_COUNT=978, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=10, QUEUE_MESSAGE_COUNT=933, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=11, QUEUE_MESSAGE_COUNT=883, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=11, QUEUE_MESSAGE_COUNT=830, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=12, QUEUE_MESSAGE_COUNT=774, QUEUE_NAME=so50014436}
Changing min to 12
{QUEUE_CONSUMER_COUNT=14, QUEUE_MESSAGE_COUNT=712, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=14, QUEUE_MESSAGE_COUNT=642, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=15, QUEUE_MESSAGE_COUNT=570, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=15, QUEUE_MESSAGE_COUNT=495, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=15, QUEUE_MESSAGE_COUNT=420, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=15, QUEUE_MESSAGE_COUNT=345, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=15, QUEUE_MESSAGE_COUNT=270, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=15, QUEUE_MESSAGE_COUNT=195, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=15, QUEUE_MESSAGE_COUNT=120, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=15, QUEUE_MESSAGE_COUNT=45, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=15, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=15, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}

Однако, если я подожду, пока мы не достигнем максимума, у меня возникнет проблема ...

{QUEUE_CONSUMER_COUNT=10, QUEUE_MESSAGE_COUNT=695, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=10, QUEUE_MESSAGE_COUNT=940, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=10, QUEUE_MESSAGE_COUNT=890, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=11, QUEUE_MESSAGE_COUNT=847, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=11, QUEUE_MESSAGE_COUNT=792, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=12, QUEUE_MESSAGE_COUNT=736, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=12, QUEUE_MESSAGE_COUNT=676, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=12, QUEUE_MESSAGE_COUNT=616, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=13, QUEUE_MESSAGE_COUNT=552, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=13, QUEUE_MESSAGE_COUNT=487, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=14, QUEUE_MESSAGE_COUNT=420, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=14, QUEUE_MESSAGE_COUNT=350, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=14, QUEUE_MESSAGE_COUNT=280, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=15, QUEUE_MESSAGE_COUNT=205, QUEUE_NAME=so50014436}
Changing min to 12
{QUEUE_CONSUMER_COUNT=17, QUEUE_MESSAGE_COUNT=128, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=17, QUEUE_MESSAGE_COUNT=43, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=17, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=17, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=16, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=16, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=16, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=16, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=16, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=16, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=16, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}
{QUEUE_CONSUMER_COUNT=16, QUEUE_MESSAGE_COUNT=0, QUEUE_NAME=so50014436}

..., где мы пойдем выше максимума.Непонятно, почему у вас 29.

Я открыл для этого JIRA Issue .

Тем не менее, непонятно, почему вы меняете мин на12, когда вы уже на максимуме (15).

...