Apache Верблюд, потребляющий из нескольких очередей на одном экземпляре ActiveMQ - PullRequest
0 голосов
/ 31 марта 2020

У меня есть один экземпляр сервера ActiveMQ, работающий с несколькими источниками данных, которые помещают данные в две очереди + один DLQ в случае сбоя при получении сообщения. Я использую Apache Camel для приема и обработки сообщений из этих очередей и хочу записать его в InfluxDB.

Однако до сих пор мне не удавалось запустить стек, так что Apache Camel потребляет все очереди в параллели. Я всегда сталкивался с ошибками такого рода:

ERROR 20831 --- [r[ActiveMQ.DLQ]]
c.c.j.DefaultJmsMessageListenerContainer : Could not refresh JMS
Connection for destination 'ActiveMQ.DLQ' - retrying using
FixedBackOff{interval=5000, currentAttempts=270,
maxAttempts=unlimited}. Cause: Broker: localhost - Client: Influx
Message Queue already connected from tcp://ip-of-machine-running-route:port

Как я могу использовать из нескольких очередей по одному Apache экземпляру Camel?

Я пробовал два подхода:

В настоящее время мой код выглядит следующим образом:

Camel Config

@Configuration
public class CamelConfig {

    @Bean
    public ShutdownStrategy shutdownStrategy() {
        MessageLogger.logInfo(getClass(), "Camel Route: STARTING...",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        DefaultShutdownStrategy strategy = new DefaultShutdownStrategy();
        int                     timeout  = 1200;
        MessageLogger.logInfo(getClass(), "Camel Route: Timeout for shutdown: " + timeout + " seconds.",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        strategy.setTimeout(timeout); // TODO make it configurable
        return strategy;
    }
}

Конфигурация клиента ActiveMQ

@Configuration
public class ActiveMqClientConfig {

    @Bean
    public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://servername:61616");
        connectionFactory.setUserName(username);
        connectionFactory.setPassword(passwd);
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setClientID("Influx Message Queue");
        connectionFactory.setConnectResponseTimeout(300);
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        return connectionFactory;
    }
}

Influx Config

@Configuration
public class InfluxDBClientConfig {

    @Bean
    public InfluxDbOkHttpClientBuilderProvider registerInfluxDbOkHttpClientBuilderProvider() {
        return () -> {
            MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTING...",
                    Thread.currentThread().getStackTrace()[0].getMethodName());
            Builder builder = new OkHttpClient.Builder() //
                    .readTimeout(1200, TimeUnit.SECONDS) //
                    .writeTimeout(1200, TimeUnit.SECONDS) //
                    .connectTimeout(1200, TimeUnit.SECONDS) //
                    .retryOnConnectionFailure(true);
            MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTED - " + builder.toString(),
                    Thread.currentThread().getStackTrace()[0].getMethodName());
            return builder;
        };
    }
}

Компонент с несколькими маршрутами:

@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
    @Autowired
    private FrameworkConfig frameworkConfig;

    @Override
    public void configure() throws Exception {
        String consumerQueueq = "activemq:queue:queue1?"               //
                + "brokerURL=tcp://ip:port";
        String consumerActiveMqDLQ    = "activemq:queue:ActiveMQ.DLQ?"                     //
                + "brokerURL=tcp://ip:port";
        String consumerQueue2           = "activemq:queue:queue2?"                     //
                + "brokerURL=tcp://ip:port";
        String emitterInfluxDB        = "influxdb://influxDb?databaseName=databaseName"          
                + "&batch=true"                                                            //
                + "&retentionPolicy=retentionPolicy"
        String emitterStreamOut       = "stream:out";

        //************************************************************************
        // Data from cryring_db_inbound to InfluxDB
        //************************************************************************
        from(consumerCryringInbound) //   
                .process(messagePayload -> {
                    Message message = messagePayload.getIn();
                    if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
                        Processor.processMessage(message.getBody(String.class), message);
                    } else {
                        Processor.processMessage(message);
                    }
                })//
                .to(emitterInfluxDB) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .log("error") //
                .to(emitterStreamOut);

        //************************************************************************
        // Data from cryring_db_inbound to InfluxDB
        //************************************************************************
        from(consumerActiveMqDLQ) //
                .process(messagePayload -> {
                    Message message = messagePayload.getIn();
                    if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
                        Processor.processMessage(message.getBody(String.class), message);
                    } else {
                        Processor.processMessage(message);
                    }
                })//
                .to(emitterInfluxDB) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .log("error") //
                .to(emitterStreamOut);

        //************************************************************************
        // Data from olog_inbound to olog
        //************************************************************************
        from(consumerOlog) //
                .process(messagePayload -> {
                    System.out.println(messagePayload.getIn());
                }) //
                .to(emitterStreamOut);
    }
}

Ответы [ 2 ]

1 голос
/ 31 марта 2020

Только один клиент может использовать ClientID. Они должны быть уникальными, а не то, что вы, вероятно, хотите установить вручную. Другим вариантом может быть установка ClientIDPrefix, чтобы получить лучший способ определить, какое приложение потребляет.

0 голосов
/ 01 апреля 2020

В отличие от того, что предлагает Внедрение нескольких потребителей верблюдов предлагает (несколько маршрутов от (). До () в одном @Component) Мне удалось выполнить эту работу, разделив маршруты на отдельные компоненты каждый, каждый из которых имеет индивидуальный идентификатор клиента. Кроме того, я заменил идентификатор клиента stati c в конфигурации ActiveMQ на UUID. Код:

ActiveMQ-Config:

@Configuration
public class ActiveMqClientConfig {

    @Bean
    public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://servername:port");
        connectionFactory.setUserName(username);
        connectionFactory.setPassword(password);
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setClientID(UUID.randomUUID().toString());
        connectionFactory.setConnectResponseTimeout(300);
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        return connectionFactory;
    }
}

Компоненты маршрута:

@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
    @Autowired
    private FrameworkConfig frameworkConfig;

    @Override
    public void configure() throws Exception {
        String consumerCryringInbound = "activemq:queue:queue1?"
                + "brokerURL=tcp://activemq-server-ip:port" 
                + "clientId=clientid1";

        String emitterInfluxDB = "influxdb://influxDb?databaseName=influx_db_name"
                + "&batch=true"                                                           
                + "&retentionPolicy=retentionPolicy";

        String emitterStreamOut       = "stream:out";

        //************************************************************************
        // Data from cryring_db_inbound to InfluxDB
        //************************************************************************
        from(consumerCryringInbound)    
                .process(processor code)
                .to(emitterInfluxDB)
                .onException(Exception.class) 
                .useOriginalMessage()
                .handled(true) 
                .log("error") 
                .to(emitterStreamOut);
    }
}

... похожие для других маршрутов, каждый с индивидуальным clientId.

...