У меня есть четыре точные копии службы, которые среди прочего ловят сообщения из определенной очереди, используя конечные точки Apache Camel RabbitMQ. Каждый маршрут выглядит так:
//Start Process from RabbitMQ queue
from("rabbitmq://" +
System.getenv("ADVERTISE_ADDRESS") +
"/" +
System.getenv("RABBITMQ_EXCHANGE_NAME") +
"?routingKey=" +
System.getenv("RABBITMQ_ROUTING_KEY") +
"&autoAck=true")
.process(exchange -> exchange.getIn().setBody(exchange.getIn().getBody()))
.unmarshal().json(JsonLibrary.Jackson, TwitterBean.class)
.transform().method(ResponseTransformer.class, "transformtwitterBean")
.marshal().json(JsonLibrary.Jackson)
.setHeader(Exchange.HTTP_METHOD, constant("POST"))
.setHeader(Exchange.CONTENT_TYPE, constant("application/json"))
.to("http4://" + System.getenv("ADVERTISE_ADDRESS") + ":" + System.getenv("CAMUNDA_PORT") + "/rest/process-definition/key/MainProcess/start")
.log("Response: ${body}");
Прямо сейчас каждая конечная точка обрабатывает сообщение.
Хотя вариант «одновременных потребителей» по умолчанию один.
Я предположил, что, возможно, мои сообщения не были подтверждены,
поэтому я установил для параметра autoAck значение true.
Это не помогло, как я могу сделать эти услуги конкурирующими потребителями?
EDIT:
Фрагмент кода из конфигурации моего приложения-издателя:
@Configuration
public class RabbitMqConfig {
@Bean
Queue queue() {
return new Queue(System.getenv("RABBITMQ_QUEUE_NAME"), true);
}
@Bean
DirectExchange exchange() {
return new DirectExchange(System.getenv("RABBITMQ_EXCHANGE_NAME"), true, true);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(System.getenv("RABBITMQ_ROUTING_KEY"));
}
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}