Эффективная обработка JMS - PullRequest
12 голосов
/ 09 марта 2011

У нас есть очередь JMS, которая получает очень большое количество сообщений.

Слушатель должен сохранить сообщение в базе данных, используя транзакцию базы данных, а затем зафиксировать транзакцию JMS.

Так как я могуделайте это более эффективно там, где мне не нужно выполнять фиксации базы данных и JMS для каждого сообщения.

Ответы [ 3 ]

10 голосов
/ 09 марта 2011

Не делайте это для каждого сообщения, делайте это партиями.JMS поддерживает транзакции так же, как ваша БД;начать транзакцию JMS, прочитать N сообщений.Запустите транзакцию БД, вставьте N сообщений.Коммит в JMS, коммит в БД.

Это, очевидно, открывает окно для гонки (происходит сбой между двумя коммитами).У вас есть это сейчас, но только для одного сообщения.Если вы хотите решить эту проблему, вы столкнетесь либо с просмотром транзакций XA (двухфазная фиксация), либо, по крайней мере, с какой-то схемой обнаружения дубликатов.Для некоторого введения в это взгляните на: http://activemq.apache.org/should-i-use-xa.html

8 голосов
/ 09 марта 2011

Предпосылка асинхронного обмена сообщениями, особенно при использовании MDB, заключается в том, что каждое сообщение является атомарным. Другими словами, предполагается, что результат обработки любого одного сообщения не зависит от результата обработки любого другого сообщения. Идеальное решение вашей проблемы сохранит атомарность сообщений.

Если бы вы обрабатывали несколько сообщений в одной единице работы, вы бы потеряли эту атомарность. Например, предположим, что вы решили синхронизировать каждые 25 сообщений. Если в 25-м сообщении произошла ошибка, например, проблема с преобразованием кодовой страницы, из-за которой он не мог быть извлечен из очереди, весь пакет сообщений был бы отклонен. Тогда они все будут доставлены. Количество повторных доставок для сообщений будет увеличиваться с каждым циклом чтения / возврата. Как только количество повторных доставок превысило пороговое значение, установленное на сервере приложений, все 25 сообщений будут отброшены или помещены в очередь в зависимости от вашей конфигурации. Чем больше пакет, тем больше сообщений могут быть затронуты в ситуации ошибки, потому что весь пакет живет или умирает вместе. Установите размер пакета равным 100, и в случае одного сообщения о заражении риску подвергнется 100 сообщений.

Альтернативное решение - разрешить множество потоков обработки в вашей MDB. С JMS вы можете создавать много сеансов под одним и тем же соединением. Каждый сеанс может управлять собственной единицей работы, поэтому каждый сеанс может независимо запускать транзакцию XA, получать сообщение, обновлять базу данных и затем фиксировать транзакцию. Если одно сообщение является ошибочным, затрагивается только это сообщение и обновление базы данных.

Есть исключения из этого. Например, если обрабатывается большой пакет, и все сообщения происходят от одного и того же производителя, обычно используется что-то кроме MDB для извлечения многих сообщений и обновления многих строк в рамках одной и той же единицы работы. Точно так же, если сообщения зависят от последовательности, параллельная обработка невозможна, потому что она не сохранит последовательность. Но опять же, зависимые от последовательности сообщения не являются атомарными. Опять же, в этом случае MDB не является идеальным решением.

В зависимости от вашего транспортного провайдера, количество поддерживаемых потоков может быть ограничено только объемом памяти. Например, WebSphere MQ может легко обрабатывать сотни одновременных потоков геттеров в очереди. Проверьте настройку конфигурации MDB сервера приложений, чтобы увидеть, сколько потоков вы можете раскрутить, а затем убедитесь, что ваш транспорт может справиться с нагрузкой. Затем поиграйте немного, чтобы найти оптимальное количество потоков. Производительность резко возрастет по мере увеличения потоков от одного, но только до определенного уровня. После этого вы обычно видите плато, а затем снижение, поскольку накладные расходы на управление потоками компенсируют прирост производительности. Место, где находится swe3et, зависит от того, насколько сильно загружен брокер обмена сообщениями и насколько сильно он ограничен процессором, памятью, диском или сетью.

0 голосов
/ 08 января 2016

Вот процессор jms, который будет принимать сообщения из одной очереди, добавлять их в список и отправлять обратно в другую очередь. Вы можете отрегулировать, как значения считываются и агрегируются в соответствующих методах:

public class JmsBatcher<T> {
    final Session session;
    private final MessageConsumer consumer;
    private final MessageProducer producer;
    private final int batchSize;


    public JmsBatcher(final Connection connection,
                      final String sourceQueue,
                      final String destQueue,
                      final int batchSize) throws JMSException {
        this.batchSize = batchSize;
        session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
        final Queue source = session.createQueue(sourceQueue);
        final Queue dest = session.createQueue(destQueue);
        consumer = session.createConsumer(source);
        producer = session.createProducer(dest);
    }

    public void processBatch() {
        final List<T> values = new ArrayList<>();
        try {
            while (values.size() < batchSize) {
                final Message message = consumer.receive();
                values.add(readObject(message));
                message.acknowledge();
            }
            producer.send(createAggregate(values));
            session.commit();
        } catch (Exception e) {
            // Log the exception
            try {
                session.rollback();
            } catch (JMSException re) {
                // Rollback failed, so something fataly wrong.
                throw new IllegalStateException(re);
            }
        }
    }

    private Message createAggregate(final List<T> values) throws JMSException {
        return session.createObjectMessage((Serializable) values);
    }

    private T readObject(final Message message) throws JMSException {
        return (T) ((ObjectMessage) message).getObject();
    }
}

Это можно запустить в отдельном потоке и просто запустить навсегда:

final JmsBatcher jmsBatcher =
    new JmsBatcher(connection, "single", "batch", 25);
new Thread(() -> {
    while (true) {
        jmsBatcher.processBatch();
    }
}).start();

Затем вы можете фиксировать базу данных из пакетных результатов. При возникновении сбоев транзакция будет повторена.

...