По крайней мере в ActiveMQ все, что вы хотите, полностью поддерживается, его зовут VirtualTopic
Концепция:
- Вы создаете VirtualTopic (Простое создание темы с префиксом
VirtualTopic.
) например.VirtualTopic.Color
- Создание потребителя, подписавшегося на этот VirtualTopic , соответствующий этому шаблону
Consumer.<clientName>.VirtualTopic.<topicName>
например.Consumer.client1.VirtualTopic.Color
, делая это, Activemq создаст очередь с этим именем, и эта очередь подпишется на VirtualTopic.Color
, затем каждое сообщение, опубликованное в этой виртуальной теме, будет доставлено в очередь client1 Обратите внимание, что он работает как биржи rabbitmq. - Все готово, теперь вы можете использовать client1 очередь, как и все очереди, со многими потребителями, DLQ, настроенной политикой повторной доставки и т. д.
- На данный момент, я думаю, вы поняли, что вы можете создать client2 , client3 и сколько подписчиков вы хотите, все они получат копиюсообщение опубликовано
VirtualTopic.Color
Здесь код
@Component
public class ColorReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
@Autowired
private JmsTemplate jmsTemplate;
// simply generating data to the topic
long id=0;
@Scheduled(fixedDelay = 500)
public void postMail() throws JMSException, IOException {
final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)];
final Color color = new Color(++id, colorName.getName());
final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
message.setObject(color);
message.setProperty("color", color.getName());
LOGGER.info("status=color-post, color={}", color);
jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message);
}
/**
* Listen all colors messages
*/
@JmsListener(
destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer"
selector = "color <> 'RED'"
)
public void genericReceiveMessage(Color color) throws InterruptedException {
LOGGER.info("status=GEN-color-receiver, color={}", color);
}
/**
* Listen only red colors messages
*
* the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that
* the containers clientId need to be different between each other
*/
@JmsListener(
// destination = "Consumer.redColorContainer.VirtualTopic.color",
destination = "Consumer.client1.VirtualTopic.color",
containerFactory = "redColorContainer", selector = "color='RED'"
)
public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException {
LOGGER.info("status=RED-color-receiver, color={}", message.getObject());
}
/**
* Listen all colors messages
*/
@JmsListener(
destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer"
)
public void genericReceiveMessage2(Color color) throws InterruptedException {
LOGGER.info("status=GEN-color-receiver-2, color={}", color);
}
}
@SpringBootApplication
@EnableJms
@EnableScheduling
@Configuration
public class Config {
/**
* Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different
* clientIds per consumer pool (as two @JmsListener above, or two application instances)
*
*/
@Bean
public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("1-5");
configurer.configure(factory, connectionFactory);
// container.setClientId("aId..."); lets spring generate a random ID
return factory;
}
@Bean
public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
// necessary when post serializable objects (you can set it at application.properties)
connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName()));
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("1-2");
configurer.configure(factory, connectionFactory);
return factory;
}
}
public class Color implements Serializable {
public static final Color WHITE = new Color("WHITE");
public static final Color BLUE = new Color("BLUE");
public static final Color RED = new Color("RED");
private String name;
private long id;
// CONSTRUCTORS, GETTERS AND SETTERS
}