Конкурирующие потребители на конечной точке Apache Camel RabbitMQ - PullRequest
0 голосов
/ 03 мая 2018

enter image description here

У меня есть четыре точные копии службы, которые среди прочего ловят сообщения из определенной очереди, используя конечные точки Apache Camel RabbitMQ. Каждый маршрут выглядит так:

//Start Process from RabbitMQ queue
    from("rabbitmq://" +
            System.getenv("ADVERTISE_ADDRESS") +
            "/" +
            System.getenv("RABBITMQ_EXCHANGE_NAME") +
            "?routingKey=" +
            System.getenv("RABBITMQ_ROUTING_KEY") +
            "&autoAck=true")
            .process(exchange -> exchange.getIn().setBody(exchange.getIn().getBody()))
            .unmarshal().json(JsonLibrary.Jackson, TwitterBean.class)
            .transform().method(ResponseTransformer.class, "transformtwitterBean")
            .marshal().json(JsonLibrary.Jackson)
            .setHeader(Exchange.HTTP_METHOD, constant("POST"))
            .setHeader(Exchange.CONTENT_TYPE, constant("application/json"))
            .to("http4://" + System.getenv("ADVERTISE_ADDRESS") + ":" + System.getenv("CAMUNDA_PORT") + "/rest/process-definition/key/MainProcess/start")
            .log("Response: ${body}");

Прямо сейчас каждая конечная точка обрабатывает сообщение. Хотя вариант «одновременных потребителей» по умолчанию один. Я предположил, что, возможно, мои сообщения не были подтверждены, поэтому я установил для параметра autoAck значение true.

Это не помогло, как я могу сделать эти услуги конкурирующими потребителями?

EDIT:

Фрагмент кода из конфигурации моего приложения-издателя:

@Configuration
public class RabbitMqConfig {
    @Bean
    Queue queue() {
        return new Queue(System.getenv("RABBITMQ_QUEUE_NAME"), true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(System.getenv("RABBITMQ_EXCHANGE_NAME"), true, true);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(System.getenv("RABBITMQ_ROUTING_KEY"));
    }

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}

Ответы [ 2 ]

0 голосов
/ 03 мая 2018

Проблема в том, что вы не называете свою очередь на стороне службы

На основании документации camel apache rabbitmq это означает, что для очереди генерируется случайное имя.

Итак:

  • у вас есть издатель, который отправляет сообщение на exchange
  • затем каждый из ваших сервисов создает очередь со случайным именем, а связывает его с биржей

Каждая служба, имеющая собственную очередь, связанную с одним и тем же обменом, получит одинаковые сообщения.

Чтобы избежать этого, вам нужно указать имя очереди, так что каждая служба будет подключаться к одной и той же очереди , что будет означать, что они будут делиться потреблением сообщений с другими экземплярами службы.

0 голосов
/ 03 мая 2018

Похоже, у вас нет очереди, но тема . См. здесь для сравнения.

Брокер сообщений отвечает за предоставление сообщения очереди только одному потребителю , независимо от того, сколько из них присутствует.

...