Я пытаюсь получить сообщения от кролика mq, используя несколько потребителей.
Для моего основного варианта использования: у нас в очереди 50К сообщений, и каждый потребитель читает одно сообщение из очереди и выполняет алгоритм с запросом 50 БД, а затем отправляет ack кролику mq.
Моя проблема: скорость в секунду остается постоянной для 5 потребителей, 10 потребителей, 20 потребителей, т. Е. 11 Ack / sec.
Я пытался комментировать мои запросы вставки в алгоритме. Поэтому я думаю, что в коде нет проблем с блокировкой.
public void init() {
try {
factory.setUri(System.getenv("CLOUDAMQP_URL"));
connection = factory.newConnection();
} catch (IOException | TimeoutException | KeyManagementException | NoSuchAlgorithmException
| URISyntaxException e) {
log.error(e.getMessage(), e);
}
}
public void processJobInstance() {
for (int i = 0; i < batchJobInstanceList.size(); i++) {
for (int c = 0; c < NUMBER_OF_CONSUMERS_PER_JOB; c++) {
try {
/*
* Do not close channel here as doWork is a future task. We will close channel in handle
* delivery.
*/
Channel channel = connection.createChannel();
final Consumer consumer = createConsumerBean(channel, batchJobInstance.getId());
boolean autoAck = false;
channel.basicQos(1);
channel.basicConsume(batchJobInstance.getId(), autoAck, consumer);
} catch (Exception e) {
log.error("Channel failed by Exception.", e);
}
}
}
}
@Bean
@Scope("prototype")
public Consumer createConsumerBean(Channel channel, final String id) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
// Algo with 50 select calls and 5-10 insert
} catch (Exception ex) {
log.error("HandleDelivery failed by Exception.", ex);
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
@Override
public void handleCancel(String consumerTag) throws IOException {
log.info("Cancelled: " + consumerTag);
}
};
}
Что плохого в моем коде, что увеличение числа потребителей не улучшает производительность?