Как я могу обрабатывать несколько сообщений одновременно из темы JMS (не очереди) с Java и Spring 3.0? - PullRequest
14 голосов
/ 22 июня 2010

Обратите внимание, что я хотел бы, чтобы несколько прослушивателей сообщений обрабатывали последовательные сообщения из темы одновременно.Кроме того, я бы хотел, чтобы каждый прослушиватель сообщений работал транзакционно, чтобы сбой обработки в данном прослушивателе сообщений приводил к тому, что сообщение этого прослушивателя остается в теме.

Похоже, что Spring DefaultMessageListenerContainer поддерживает одновременность только для очередей JMS.

Нужно ли создавать несколько экземпляров DefaultMessageListenerContainers?

Если время течет по вертикальной оси:

ListenerA reads msg 1        ListenerB reads msg 2        ListenerC reads msg 3
ListenerA reads msg 4        ListenerB reads msg 5        ListenerC reads msg 6
ListenerA reads msg 7        ListenerB reads msg 8        ListenerC reads msg 9
ListenerA reads msg 10       ListenerB reads msg 11       ListenerC reads msg 12
...

ОБНОВЛЕНИЕ:
Спасибо за ваши отзывы @ T.Rob и @ skaffman.

Я закончил тем, что создал несколько DefaultMessageListenerContainers с concurrency=1, а затем поместил логику в прослушиватель сообщений так, чтобы только одинпоток обработает заданный идентификатор сообщения.

Ответы [ 8 ]

5 голосов
/ 22 июня 2010

Вам не нужно несколько DefaultMessageListenerContainer экземпляров, нет, но вам нужно настроить одновременное использование DefaultMessageListenerContainer, используя свойство concurrentConsumers :

Укажите количество одновременных потребителей для создания.Значение по умолчанию: 1.

Если задать более высокое значение для этого параметра, будет увеличен стандартный уровень запланированных одновременных потребителей во время выполнения: фактически это минимальное количество одновременных потребителей, которое будет запланировано в любой момент времени.Это статическая настройка;для динамического масштабирования рассмотрите возможность указания параметра «maxConcurrentConsumers».

Рекомендуется увеличение числа одновременных потребителей, чтобы масштабировать потребление сообщений, поступающих из очереди.Однако обратите внимание, что любые гарантии заказа теряются после регистрации нескольких потребителей.В общем случае, придерживайтесь 1 потребителя для очередей с малым объемом.

Однако внизу есть большое предупреждение:

Не увеличивайте количество одновременныхпотребители по теме .Это приведет к одновременному потреблению одного и того же сообщения, что вряд ли когда-либо желательно.

Это интересно и имеет смысл, когда вы об этом думаете.То же самое произошло бы, если бы у вас было несколько DefaultMessageListenerContainer экземпляров.

Я думаю, что, возможно, вам нужно переосмыслить свой дизайн, хотя я не уверен, что я бы предложил.Одновременное потребление сообщений pub / sub кажется вполне разумным, но как избежать одновременной доставки одного и того же сообщения всем вашим потребителям?

3 голосов
/ 21 мая 2017

По крайней мере в ActiveMQ все, что вы хотите, полностью поддерживается, его зовут VirtualTopic

Концепция:

  1. Вы создаете VirtualTopic (Простое создание темы с префиксом VirtualTopic.) например.VirtualTopic.Color
  2. Создание потребителя, подписавшегося на этот VirtualTopic , соответствующий этому шаблону Consumer.<clientName>.VirtualTopic.<topicName> например.Consumer.client1.VirtualTopic.Color, делая это, Activemq создаст очередь с этим именем, и эта очередь подпишется на VirtualTopic.Color, затем каждое сообщение, опубликованное в этой виртуальной теме, будет доставлено в очередь client1 Обратите внимание, что он работает как биржи rabbitmq.
  3. Все готово, теперь вы можете использовать client1 очередь, как и все очереди, со многими потребителями, DLQ, настроенной политикой повторной доставки и т. д.
  4. На данный момент, я думаю, вы поняли, что вы можете создать client2 , client3 и сколько подписчиков вы хотите, все они получат копиюсообщение опубликовано VirtualTopic.Color

Здесь код

@Component
public class ColorReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    // simply generating data to the topic
    long id=0;
    @Scheduled(fixedDelay = 500)
    public void postMail() throws JMSException, IOException {

        final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)];
        final Color color = new Color(++id, colorName.getName());
        final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
        message.setObject(color);
        message.setProperty("color", color.getName());
        LOGGER.info("status=color-post, color={}", color);
        jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message);
    }

    /**
     * Listen all colors messages
     */
    @JmsListener(
        destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer"
        selector = "color <> 'RED'"
    )
    public void genericReceiveMessage(Color color) throws InterruptedException {
        LOGGER.info("status=GEN-color-receiver, color={}", color);
    }

    /**
     * Listen only red colors messages
     *
     * the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that
     * the containers clientId need to be different between each other
     */
    @JmsListener(
//      destination = "Consumer.redColorContainer.VirtualTopic.color",
        destination = "Consumer.client1.VirtualTopic.color",
        containerFactory = "redColorContainer", selector = "color='RED'"
    )
    public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException {
        LOGGER.info("status=RED-color-receiver, color={}", message.getObject());
    }

    /**
     * Listen all colors messages
     */
    @JmsListener(
        destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer"
    )
    public void genericReceiveMessage2(Color color) throws InterruptedException {
        LOGGER.info("status=GEN-color-receiver-2, color={}", color);
    }

}

@SpringBootApplication
@EnableJms
@EnableScheduling
@Configuration
public class Config {

    /**
     * Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different
     * clientIds per consumer pool (as two @JmsListener above, or two application instances)
     * 
     */
    @Bean
    public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory, 
        DefaultJmsListenerContainerFactoryConfigurer configurer) {

        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-5");
        configurer.configure(factory, connectionFactory);
        // container.setClientId("aId..."); lets spring generate a random ID
        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {

        // necessary when post serializable objects (you can set it at application.properties)
        connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName()));

        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-2");
        configurer.configure(factory, connectionFactory);
        return factory;
    }

}

public class Color implements Serializable {

    public static final Color WHITE = new Color("WHITE");
    public static final Color BLUE = new Color("BLUE");
    public static final Color RED = new Color("RED");

    private String name;
    private long id;

    // CONSTRUCTORS, GETTERS AND SETTERS
}
2 голосов
/ 01 октября 2012

Вот возможность:

1) создать только один DMLC, сконфигурированный с bean-компонентом и методом для обработки входящего сообщения.Установите для его параллелизма значение 1.

2) Сконфигурируйте исполнителя задачи с # нитями, равными желаемому вами параллелизму.Создайте пул объектов для объектов, которые должны обрабатывать сообщение.Дайте ссылку на исполнителя задач и пул объектов на bean-компонент, настроенный вами в # 1.Пул объектов полезен, если фактический компонент обработки сообщений не является потокобезопасным.

3) Для входящего сообщения компонент в DMLC создает пользовательский Runnable, указывает его на сообщение и пул объектов и даетэто исполнителю задачи.

4) Метод run Runnable получает компонент из пула объектов и вызывает его метод 'process' с сообщением.

# 4 можно управлять с помощьюпрокси и пул объектов, чтобы упростить его.

Я еще не пробовал это решение, но, кажется, оно отвечает всем требованиям.Обратите внимание, что это решение не так надежно, как EJB MDB.Например, Spring не будет отбрасывать объект из пула, если он генерирует исключение RuntimeException.

1 голос
/ 18 января 2019

Разрешено использование нескольких потребителей по одной и той же теме подписки в JMS 2.0, хотя в случае JMS 1.1 это было не так. Пожалуйста, обратитесь: https://www.oracle.com/technetwork/articles/java/jms2messaging-1954190.html

1 голос
/ 23 июня 2010

Это один из тех случаев, когда различия в транспортных провайдерах всплывают из-за абстракции JMS.JMS хочет предоставить копию сообщения каждому подписчику по теме.Но поведение, которое вы хотите, - это, действительно, поведение очереди.Я подозреваю, что существуют другие требования, приводящие это к решению pub / sub, которые не были описаны - например, другие вещи должны подписываться на одну и ту же тему независимо от вашего приложения.

Если бы я делал это в WebSphereРешением MQ было бы создание административной подписки, в результате чего в очередь помещалась бы одна копия каждого сообщения по заданной теме.Тогда ваши несколько подписчиков могут бороться за сообщения в этой очереди.Таким образом, ваше приложение может иметь несколько потоков, между которыми распределяются сообщения, и в то же время другие подписчики, независимые от этого приложения, могут динамически (не) подписываться на одну и ту же тему.

К сожалению, нет общего JMS-портативный способ сделать это.Вы в значительной степени зависите от реализации поставщика услуг транспорта.Единственный из них, о котором я могу поговорить, - это WebSphere MQ, но я уверен, что другие транспорты поддерживают это так или иначе и в разной степени, если вы креативны.

0 голосов
/ 20 ноября 2018

Создание пользовательского задания, казалось бы, решило проблему для меня, без дублирования обработки:

@Configuration
class BeanConfig {
    @Bean(destroyMethod = "shutdown")
    public ThreadPoolTaskExecutor topicExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setAllowCoreThreadTimeOut(true);
        executor.setKeepAliveSeconds(300);
        executor.setCorePoolSize(4);
        executor.setQueueCapacity(0);
        executor.setThreadNamePrefix("TOPIC-");
        return executor;
    }

    @Bean
    JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer, @Qualifier("topicExecutor") Executor topicExecutor) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(true);
        factory.setSessionTransacted(false);
        factory.setSubscriptionDurable(false);
        factory.setTaskExecutor(topicExecutor);
        return factory;
    }

}

class MyBean {
    @JmsListener(destination = "MYTOPIC", containerFactory = "topicListenerFactory", concurrency = "1")
    public void receiveTopicMessage(SomeTopicMessage message) {}
}
0 голосов
/ 23 марта 2012

наткнулся на этот вопрос.Моя конфигурация:

Создайте bean-компонент с id="DefaultListenerContainer", добавьте свойство name="concurrentConsumers" value="10" и свойство name="maxConcurrentConsumers" value ="50".

Пока все работает отлично.Я напечатал идентификатор потока и убедился, что несколько потоков действительно создаются, а также используются повторно.

0 голосов
/ 09 марта 2012

Я столкнулся с той же проблемой. В настоящее время я изучаю RabbitMQ, который, кажется, предлагает идеальное решение в схеме проектирования, которую они называют «рабочими очередями». Больше информации здесь: http://www.rabbitmq.com/tutorials/tutorial-two-java.html

Если вы не полностью привязаны к JMS, вы можете посмотреть на это. Может также существовать мост от JMS к AMQP, но это может показаться странным.

Я получаю удовольствие (читай: трудности) с установкой и запуском RabbitMQ на моем Mac, но думаю, что я близок к тому, чтобы он работал, я отправлю ответ, если смогу решить эту проблему.

...