Получение нескольких сообщений от MQ асинхронно - PullRequest
0 голосов
/ 11 марта 2019

Я использую Spring + Hibernate + JPA в своем приложении.

Мне нужно прочитать сообщение с Websphere MQ и вставить сообщение в БД. Иногда могут быть доступны непрерывные сообщения, а иногда очень меньшее количество сообщений, а иногда мы можем ожидать отсутствие сообщений от Queue.

В настоящее время я читаю сообщения по одному и вставляю их в базу данных. Но это не сильно помогает с точки зрения производительности.

Я имею в виду, когда у меня есть кусок сообщений (например, 300 тыс. Сообщений в очереди), я не мог вставить их быстрее. Количество объектов, вставляемых в БД в секунду, не так велико. Потому что я делаю коммит для каждой сущности.

Я хочу использовать пакетную обработку в спящем режиме, чтобы я мог вставить список объектов в один коммит. (Пример: от 30 до 40 сообщений на коммит)

Вопросы:

  1. Как получить несколько сообщений из очереди? (Я проверил, что BatchMessageListenerContainer может быть полезным. Но я не смог получить некоторую ссылку)

  2. Должен ли я отделить процесс вставки БД от метода onMessage? Таким образом, этот поток будет выпущен в пул и будет доступен для выбора следующих сообщений из очереди?

  3. Использование параллельных потоков?

Текущая реализация:

Прослушиватель сообщений:

<bean id="myMessageListener" class="org.mypackage.MyMessageListener">

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" ref="queue"/>
    <property name="messageListener" ref="myMessageListener"/>
    <property name ="concurrentConsumers" value ="10"/>
    <property name ="maxConcurrentConsumers" value ="50"/>        
</bean>

Класс слушателя:

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    @Autowired
    private MyService myService;

    @Override
    public void onMessage(Message message) {
        try {
             TextMessage textMessage = (TextMessage) message;
             // parse the message
             // Process the message to DB
        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }
}

1 Ответ

1 голос
/ 29 марта 2019

Не ясно, каковы ваши требования.

Проект Spring Batch предоставляет BatchMessageListenerContainer .

Контейнер прослушивателя сообщений, адаптированный для перехвата приема сообщений с рекомендациями, предоставленными через конфигурацию.Чтобы включить пакетирование сообщений в одной транзакции, используйте TransactionInterceptor и RepeatOperationsInterceptor в цепочке рекомендаций (с менеджером транзакций, установленным в базовом классе или без него).Вместо получения одного сообщения и его обработки контейнер будет использовать RepeatOperations для получения нескольких сообщений в одном потоке.Используйте с RepeatOperations и перехватчиком транзакции.Если перехватчик транзакций использует XA, тогда используйте фабрику соединений XA или TransactionAwareConnectionFactoryProxy, чтобы синхронизировать сеанс JMS с текущей транзакцией (открывая возможность дублирования сообщений после сбоя).В последнем случае вам не нужно предоставлять диспетчер транзакций в базовом классе - он только запускается и предотвращает синхронизацию сеанса JMS с транзакцией базы данных.

...