Нет улучшения производительности с несколькими потребителями - PullRequest
2 голосов
/ 29 мая 2019

Я пытаюсь получить сообщения от кролика mq, используя несколько потребителей.

Для моего основного варианта использования: у нас в очереди 50К сообщений, и каждый потребитель читает одно сообщение из очереди и выполняет алгоритм с запросом 50 БД, а затем отправляет ack кролику mq.

Моя проблема: скорость в секунду остается постоянной для 5 потребителей, 10 потребителей, 20 потребителей, т. Е. 11 Ack / sec.

Я пытался комментировать мои запросы вставки в алгоритме. Поэтому я думаю, что в коде нет проблем с блокировкой.

    public void init() {
        try {
            factory.setUri(System.getenv("CLOUDAMQP_URL"));
            connection = factory.newConnection();
        } catch (IOException | TimeoutException | KeyManagementException | NoSuchAlgorithmException
                | URISyntaxException e) {
            log.error(e.getMessage(), e);
        }
    }

    public void processJobInstance() {
        for (int i = 0; i < batchJobInstanceList.size(); i++) {

                for (int c = 0; c < NUMBER_OF_CONSUMERS_PER_JOB; c++) {
                    try {
                        /*
                         * Do not close channel here as doWork is a future task. We will close channel in handle
                         * delivery.
                         */
                        Channel channel = connection.createChannel();
                        final Consumer consumer = createConsumerBean(channel, batchJobInstance.getId());
        boolean autoAck = false;
        channel.basicQos(1);
        channel.basicConsume(batchJobInstance.getId(), autoAck, consumer);
                    } catch (Exception e) {
                        log.error("Channel failed by Exception.", e);
                    }
                }
        }
    }

        @Bean
    @Scope("prototype")
    public Consumer createConsumerBean(Channel channel, final String id) {
        return new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {

                try {
// Algo with 50 select calls and 5-10 insert                

                } catch (Exception ex) {
                    log.error("HandleDelivery failed by Exception.", ex);
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }

            @Override
            public void handleCancel(String consumerTag) throws IOException {
                log.info("Cancelled: " + consumerTag);
            }
        };
    }

Что плохого в моем коде, что увеличение числа потребителей не улучшает производительность?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...