Верблюжий маршрут потерял сообщение о перезагрузке в Camel Rabbitmq - PullRequest
0 голосов
/ 04 апреля 2020

Я использую camel-rabbitmq. Вот мое определение маршрута

camelContext.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("rabbitmq:TEST?queue=TEST&concurrentConsumers=5")
            .routeId("jms")
            .autoStartup(false)
            .throttle(10)
            .asyncDelayed()
            .log("Consuming message ${body} to ${header.deliveryAddress}")
            .process(new Processor() {

                @Override
                public void process(Exchange exchange) throws Exception {
                        System.out.println(atomicLong.decrementAndGet());
                }
            })

            ;
        }
    }); 

Когда я отправляю sh 500 сообщений в эту очередь, когда останавливаются и запускаются маршруты, все сообщения на канале будут потеряны, интересно, куда они направляются.

Если я настраиваю тот же маршрут с &autoAck=false, он работает правильно, но теряет производительность. Почему верблюд не предлагает одинаковое поведение с и без autoAck.

Ответы [ 2 ]

0 голосов
/ 09 апреля 2020

Вам нужно проверить счетчик предварительной выборки rabbitmq потребительскую предварительную выборку Я думаю, что по умолчанию потребитель выбирает все сообщения в очереди в свои буферы памяти. Если вы установите количество предварительных выборок равным 1, потребитель будет подтверждать сообщения одно за другим. Все остальные неподтвержденные будут присутствовать в очереди в состоянии готовности. Ожидание получения после того, как потребитель завершит задание с предыдущим выбранным сообщением.

0 голосов
/ 05 апреля 2020

Я справился со своей проблемой, выполнив следующие изменения в rabbitmqconsumer of camel-rabbitmq

    public void handleCancelOk(String consumerTag) {
        // no work to do
        log.info("Received cancelOk signal on the rabbitMQ channel");
        **downLatch.countDown();**
    }
  @Override
    protected void doStop() throws Exception {
        if (channel == null) {
            return;
        }
        this.requeueChannel=openChannel(consumer.getConnection());
         if (tag != null && isChannelOpen()) {
            channel.basicCancel(tag);
        }
        stopping=true;
         downLatch.await();         

        try {
            lock.acquire();
            if (isChannelOpen()) {
                channel.close();
            }
        } catch (TimeoutException e) {
            log.error("Timeout occured");
            throw e;
        } catch (InterruptedException e1) {
            log.error("Thread Interrupted!");
        } finally {
            lock.release();
        }


    }

Выполнив этот верблюжий маршрут, мы отправим сообщение и избежали потери сообщения.

...