Отправить 35000 сообщений в минуту в минуту - PullRequest
1 голос
/ 10 июня 2019

У нас есть приложение с пружинной загрузкой для выполнения нагрузочного теста на одном другом компоненте. Нам нужно отправлять максимум 35000 сообщений JMS в минуту, и по этой причине я использую планировщик для запуска задачи каждую минуту.

Проблема в том, что когда я сохраняю низкую интенсивность, мне удается отправлять сообщения в течение указанного интервала времени (одна минута). Но когда интенсивность высокая, отправка фрагмента сообщений занимает более 1 минуты. Любые предложения по реализации ниже?

Класс планировщика

@Component
public class MessageScheduler {

private final Logger log = LoggerFactory.getLogger(getClass());
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(16);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);

@Autowired
JmsSender sender;

    public void startScheduler() {
       Runnable runnableTask = sender::sendMessagesChunk;
       executorService.scheduleAtFixedRate(runnableTask, 0, TIME_PERIOD, 
       TimeUnit.MILLISECONDS);
    }
}

Класс для отправки сообщений

@Component
public class JmsSender {

@Autowired
TrackingManager manager;

private final Logger log = LoggerFactory.getLogger(getClass());
private final static int TOTAL_MESSAGES = ConfigFactory.getConfig().getInt("total.tracking.messages").orElse(10);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);
private static int failedPerPeriod=0;
private static int totalFailed=0;
private static int totalMessageCounter=0;

public void sendMessagesChunk() {
    log.info("Started  at: {}", Instant.now());
    log.info("Sending messages with intensity {} messages/minute", TOTAL_MESSAGES);
    for (int i=0; i<TOTAL_MESSAGES; i++) {
        try {
            long start = System.currentTimeMillis();
            MessageDTO msg = manager.createMessage();
            send(msg);
            long stop = System.currentTimeMillis();
            if (timeOfDelay(stop-start)>=0L) {
                Thread.sleep(timeOfDelay(stop-start));
            }
        } catch (Exception e) {
            log.info("Error :  " + e.getMessage());
            failedPerPeriod++;
        }
    }
    totalMessageCounter += TOTAL_MESSAGES;
    totalFailed += failedPerPeriod;
    log.info("Finished  at: {}", Instant.now());
    log.info("Success rate(of last minute): {} %, Succeeded: {}, Failed: {}, Success rate(in total): {} %, Succeeded: {}, Failed: {}"
            ,getSuccessRatePerPeriod(), getSuccededPerPeriod(), failedPerPeriod,
            getTotalSuccessRate(), getTotalSucceded(), totalFailed);
    failedPerPeriod =0;
}

private long timeOfDelay(Long elapsedTime){
    return (TIME_PERIOD / TOTAL_MESSAGES) - elapsedTime;
}
private int getSuccededPerPeriod(){
    return TOTAL_MESSAGES - failedPerPeriod;
}

private int getTotalSucceded(){
    return totalMessageCounter - totalFailed;
}

private double getSuccessRatePerPeriod(){
    return getSuccededPerPeriod()*100D / TOTAL_MESSAGES;
}

private double getTotalSuccessRate(){
    return getTotalSucceded()*100D / totalMessageCounter;
}

private void send(MessageDTO messageDTO) throws Exception {
    requestContextInitializator();
    JmsClient client = JmsClientBuilder.newClient(UriScheme.JmsType.AMQ);
    client.target(new URI("activemq:queue:" + messageDTO.getDestination()))
            .msgTypeVersion(messageDTO.getMsgType(), messageDTO.getVersion())
            .header(Header.MSG_VERSION, messageDTO.getVersion())
            .header(Header.MSG_TYPE, messageDTO.getMsgType())
            .header(Header.TRACKING_ID, UUID.randomUUID().toString())
            .header(Header.CLIENT_ID, "TrackingJmsClient")
            .post(messageDTO.getPayload());
}

1 Ответ

3 голосов
/ 11 июня 2019

Вы должны решить две проблемы:

  1. общее время операции отправки должно быть меньше максимального времени.
  2. сообщения должны отправляться не так быстро, как это возможно, вместо этого они должны быть отправленыравномерно по всему доступному времени.

Очевидно, что если ваш метод send слишком медленный, максимальное время будет превышено.

Более быстрый способ отправки сообщений состоит в использовании некоторыхвид массовых операций.Неважно, если ваш MQ API не поддерживает массовые операции, вы не можете его использовать!из-за второго ограничения («равномерно»).

Вы можете отправлять сообщения асинхронно, но если ваш MQ API создает потоки для этого вместо «неблокирующей» асинхронности, у вас могут возникнуть проблемы с памятью.

Используя javax.jms.MessageProducer.send, вы можете отправлять сообщения асинхронно, но для каждого из них будет создан новый поток (будет создано много памяти и потоков сервера).

Еще одно ускорение может создать только одинJMS-клиент (ваш метод send).

Чтобы выполнить второе требование, вы должны исправить свою функцию timeOfDelay, это неправильно.На самом деле, вы должны учитывать распределение вероятностей функции send, чтобы оценить правильное значение, но вы можете просто сделать:

    long accTime = 0L;
    for (int i=0; i<TOTAL_MESSAGES; i++) {
        try {
            long start = System.currentTimeMillis();
            MessageDTO msg = manager.createMessage();
            send(msg);
            long stop = System.currentTimeMillis();
            accTime += stop - start;
            if(accTime < TIME_PERIOD)
                Thread.sleep((TIME_PERIOD - accTime) / (TOTAL_MESSAGES - i));
        } catch (Exception e) {
            log.info("Error :  " + e.getMessage());
            failedPerPeriod++;
        }
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...