Пакетная / массовая обработка сообщений (JMS) с использованием Wildfly - PullRequest
0 голосов
/ 28 декабря 2018

У меня есть очередь JMS, которая заполняется некоторыми данными временных рядов.Чтобы предотвратить тысячи вставок SQL одной транзакции, я хочу обработать их громоздким способом, а не методом MessageListener onMessage «на сообщение».

Единственное решение, о котором я подумал, - это расписание, которое доставляет большую часть сообщений из очереди и периодически их сохраняет.

@Stateless
public class SensorDataReceiver {

    private static final int THRESHOLD_IN_SECONDS = 10;

    private static final int QUEUE_TIMEOUT_IN_MILLIS = 1000;

    @Resource(mappedName = "java:jboss/jms/queue/sensorData")
    private Queue queue;

    @Inject
    private JMSContext context;

    @Inject
    private SensorDataDAO sensorDataDAO;

    @SneakyThrows
    @Schedule(hour = "*", minute = "*", second = "*/15", persistent = false)
    public void scheduled() {
        LocalDateTime statUpPlusThreshold = now().plusSeconds(THRESHOLD_IN_SECONDS);
        JMSConsumer consumer = context.createConsumer(queue);

        List<SensorData> sensorDataToInsert = new ArrayList<>();
        do {
            ObjectMessage message = (ObjectMessage) consumer.receive(QUEUE_TIMEOUT_IN_MILLIS);

            if (message == null) {
                break;
            }

            sensorDataToInsert.add((sensorData) message.getObject());
        } while (now().isBefore(statUpPlusThreshold) && sensorDataToInsert.size() < 10_000);

        logger.info(format("Got \"%d\" SensorData to persist.", sensorDataToInsert.size()));
        sensorDataDAO.batchSaveOrUpdate(sensorDataToInsert);
        logger.info(format("Persisted \"%d\" SensorData.", sensorDataToInsert.size()));
    }
}

Но я не думаю, что это самый разумный способ сделать это, поэтому я трачу время на обработку большего количества сообщений в минуту, когда расписание выполняется быстрее, чем настроенный интервал (я могу вставить 10 тысяч строк примерно за 2-3 секунды в моей тестовой системе), а с другой стороны, этот код склонен к «перекрывающемуся запланированному выполнению».

1 Ответ

0 голосов
/ 30 декабря 2018

Я бы порекомендовал иметь пул bean-компонентов без сохранения состояния, которые активны все время (т.е. они не запланированы), которые потребляют установленное количество сообщений (т.е. не до тех пор, пока очередь не станет пустой, что будет произвольным числом сообщений) изатем, которые вставляют данные из этих сообщений в одну операцию базы данных.

Все бины в пуле могут быть активны одновременно и могут использовать и вставлять свои пакеты как можно быстрее.Это обеспечит своевременное использование сообщений, что, как мы надеемся, позволит избежать накопления сообщений в очереди.

Вы можете установить тайм-аут на receive, чтобы, если вы действительно достигнете концаочередь до того, как будет достигнут размер пакета, данные будут по-прежнему вставляться своевременно.

Для того, чтобы начать этот процесс при запуске сервера приложений, вы можете пометить компонент с помощью @Startup и @Singleton изатем аннотируйте метод с помощью @PostConstruct, который зацикливается достаточно времени, чтобы заполнить ваш «пул», и вызывает метод для вашего компонента @Stateless, который будет получать пакеты сообщений и обрабатывать их.

...