У меня есть Java-приложение с несколькими компонентами, взаимодействующими через JMS (ActiveMQ).В настоящее время приложение и JMS-концентратор находятся на одном сервере, хотя в конечном итоге мы планируем разделить компоненты для обеспечения масштабируемости.В настоящее время мы сталкиваемся с серьезными проблемами с производительностью, по-видимому, вокруг JMS, в первую очередь, и в центре внимания этого вопроса находится количество времени, которое требуется для публикации сообщения в теме.
У нас около 50 динамическисозданные темы, используемые для связи между компонентами приложения.Один компонент читает записи из таблицы и обрабатывает их по одной, обработка включает создание сообщения объекта JMS и публикацию его в одной из тем.Эта обработка не могла соответствовать скорости записи записей в исходную таблицу ~ 23 / сек, поэтому мы изменили обработку, чтобы создать сообщение объекта JMS и добавить его в очередь.Был создан новый поток, который прочитал из этой очереди и опубликовал сообщение в соответствующей теме.Очевидно, что это не ускоряет обработку, но позволяет нам увидеть, насколько далеко мы отставали, посмотрев на размер очереди.
В начале дня никакие сообщения не проходят через всю систему, это быстро увеличивает скорость с 1560000 (433 / сек) сообщений через концентратор в первый час до 2100000 (582 / сек) в 3-мчас, а затем оставаться на этом уровне.Однако в начале первого часа публикация сообщений из компонента, считывающего записи из таблицы базы данных, продолжается, к концу этого часа в очереди ожидается 2000 сообщений, ожидающих отправки, и к 3-му часу в очереди 9000 сообщений.в нем.
Ниже приведены соответствующие разделы кода, которые отправляют сообщения JMS, любые советы о том, что мы делаем неправильно или как мы можем улучшить эту производительность, высоко ценится.Просматривая статистику в Интернете, JMS должна легко обрабатывать ~ 1000-2000 больших сообщений / сек или ~ 10000 маленьких сообщений / сек.Наши сообщения имеют размер около 500 байт каждое, поэтому я представляю, что где-то посередине этой шкалы.
Код для получения издателя:
private JmsSessionPublisher getJmsSessionPublisher(String topicName) throws JMSException {
if (!this.topicPublishers.containsKey(topicName)) {
TopicSession pubSession = (ActiveMQTopicSession) topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic = getTopic(topicName, pubSession);
// Create a JMS publisher and subscriber
TopicPublisher publisher = pubSession.createPublisher(topic);
this.topicPublishers.put(topicName, new JmsSessionPublisher(pubSession, publisher));
}
return this.topicPublishers.get(topicName);
}
Отправка сообщения:
JmsSessionPublisher jmsSessionPublisher = getJmsSessionPublisher(topicName);
ObjectMessage objMessage = jmsSessionPublisher.getSession().createObjectMessage(messageObj);
objMessage.setJMSCorrelationID(correlationID);
objMessage.setJMSTimestamp(System.currentTimeMillis());
jmsSessionPublisher.getPublisher().publish(objMessage, false, 4, 0);
Код, который добавляет сообщения в очередь:
List<EventQueue> events = eventQueueDao.getNonProcessedEvents();
for (EventQueue eventRow : events) {
IEvent event = eventRow.getEvent();
AbstractEventFactory.EventType eventType = AbstractEventFactory.EventType.valueOf(event.getEventType());
String topic = event.getTopicName() + topicSuffix;
EventMsgPayload eventMsg = AbstractEventFactory.getFactory(eventType).getEventMsgPayload(event);
synchronized (queue) {
queue.add(new QueueElement(eventRow.getEventId(), topic, eventMsg));
queue.notify();
}
}
Код в потоке, удаляющий элементы из очереди:
jmsSessionFactory.publishMessageToTopic(e.getTopic(), e.getEventMsg(), Integer.toString(e.getEventMsg().hashCode()));
publishMessageToTopic выполняет приведенный выше код «Отправка сообщения».
Другие реализации JMS являются опцией, если все согласны с тем, что ActiveMQ может быть не лучшим вариантом.
Спасибо,
Джеймс