Как исправить странную ошибку "channelMax" rabbitmq rpc? - PullRequest
0 голосов
/ 14 мая 2019

Я создал простой клиент и сервер. Клиент отправляет запросы rpc:

RabbitTemplate template.convertSendAndReceive(...) ;

Сервер получает его и отвечает обратно:

@RabbitListener(queues = "#{queue.getName()}")
public Object handler(@Payload String key)...

Затем я заставляю клиента отправлять запросы rpc асинхронно, одновременно (что приводит к множеству одновременных запросов rpc).

И неожиданно получаю ошибку:

org.springframework.amqp.AmqpResourceNotAvailableException: достигнут предел ChannelMax. Попробуй позже. в org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel (SimpleConnection.java:59) в org.springframework.amqp.rabbit.connection.CachingConnectionFactory $ ChannelCachingConnectionProxy.createBareChannel (CachingConnectionFactory.java:1208) в org.springframework.amqp.rabbit.connection.CachingConnectionFactory $ ChannelCachingConnectionProxy.access $ 200 (CachingConnectionFactory.java:1196) в org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel (CachingConnectionFactory.java:599) в org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel (CachingConnectionFactory.java:582) в org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy (CachingConnectionFactory.java:552) в org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel (CachingConnectionFactory.java:534) в org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access $ 1400 (CachingConnectionFactory.java:99) в org.springframework.amqp.rabbit.connection.CachingConnectionFactory $ ChannelCachingConnectionProxy.createChannel

Клиент Rabbitmq, похоже, создает слишком много каналов. Как это исправить? И почему мой клиент создал их так много?

1 Ответ

1 голос
/ 14 мая 2019

Каналы кэшируются, поэтому должно быть только столько каналов, сколько существует реальных вызовов RPC.

Возможно, вам потребуется увеличить максимальную настройку канала в брокере.

EDIT

Если ваши вызовы RPC являются долгоживущими, вы можете сократить время использования канала, используя AsyncRabbitTemplate с явной очередью ответа, и избежать использования функции прямого ответа на запрос..

См. Документацию .

EDIT2

Вот пример использования AsyncRabbitTemplate;он отправляет 1000 сообщений в 100 потоках (а у потребителя 100 потоков).

Общее количество используемых каналов было 107 - 100 для потребителей, и только 7 использовались для отправки.

@SpringBootApplication
public class So56126654Application {

    public static void main(String[] args) {
        SpringApplication.run(So56126654Application.class, args);
    }

    @RabbitListener(queues = "so56126654", concurrency = "100")
    public String slowService(String in) throws InterruptedException {
        Thread.sleep(5_000L);
        return in.toUpperCase();
    }

    @Bean
    public ApplicationRunner runner(AsyncRabbitTemplate asyncTemplate) {
        ExecutorService exec = Executors.newFixedThreadPool(100);
        return args -> {
            System.out.println(asyncTemplate.convertSendAndReceive("foo").get());
            for (int i = 0; i < 1000; i++) {
                int n = i;
                exec.execute(() -> {
                    RabbitConverterFuture<Object> future = asyncTemplate.convertSendAndReceive("foo" + n);
                    try {
                        System.out.println(future.get(10, TimeUnit.SECONDS));
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        e.printStackTrace();
                    }
                    catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                    catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                });
            }
        };
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(ConnectionFactory connectionFactory) {
        return new AsyncRabbitTemplate(connectionFactory, "", "so56126654", "so56126654-replies");
    }

    @Bean
    public Queue queue() {
        return new Queue("so56126654");
    }

    @Bean
    public Queue reeplyQueue() {
        return new Queue("so56126654-replies");
    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...