У меня есть один экземпляр сервера ActiveMQ, работающий с несколькими источниками данных, которые помещают данные в две очереди + один DLQ в случае сбоя при получении сообщения. Я использую Apache Camel для приема и обработки сообщений из этих очередей и хочу записать его в InfluxDB.
Однако до сих пор мне не удавалось запустить стек, так что Apache Camel потребляет все очереди в параллели. Я всегда сталкивался с ошибками такого рода:
ERROR 20831 --- [r[ActiveMQ.DLQ]]
c.c.j.DefaultJmsMessageListenerContainer : Could not refresh JMS
Connection for destination 'ActiveMQ.DLQ' - retrying using
FixedBackOff{interval=5000, currentAttempts=270,
maxAttempts=unlimited}. Cause: Broker: localhost - Client: Influx
Message Queue already connected from tcp://ip-of-machine-running-route:port
Как я могу использовать из нескольких очередей по одному Apache экземпляру Camel?
Я пробовал два подхода:
В настоящее время мой код выглядит следующим образом:
Camel Config
@Configuration
public class CamelConfig {
@Bean
public ShutdownStrategy shutdownStrategy() {
MessageLogger.logInfo(getClass(), "Camel Route: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
DefaultShutdownStrategy strategy = new DefaultShutdownStrategy();
int timeout = 1200;
MessageLogger.logInfo(getClass(), "Camel Route: Timeout for shutdown: " + timeout + " seconds.",
Thread.currentThread().getStackTrace()[0].getMethodName());
strategy.setTimeout(timeout); // TODO make it configurable
return strategy;
}
}
Конфигурация клиента ActiveMQ
@Configuration
public class ActiveMqClientConfig {
@Bean
public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://servername:61616");
connectionFactory.setUserName(username);
connectionFactory.setPassword(passwd);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setClientID("Influx Message Queue");
connectionFactory.setConnectResponseTimeout(300);
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
Thread.currentThread().getStackTrace()[0].getMethodName());
return connectionFactory;
}
}
Influx Config
@Configuration
public class InfluxDBClientConfig {
@Bean
public InfluxDbOkHttpClientBuilderProvider registerInfluxDbOkHttpClientBuilderProvider() {
return () -> {
MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
Builder builder = new OkHttpClient.Builder() //
.readTimeout(1200, TimeUnit.SECONDS) //
.writeTimeout(1200, TimeUnit.SECONDS) //
.connectTimeout(1200, TimeUnit.SECONDS) //
.retryOnConnectionFailure(true);
MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTED - " + builder.toString(),
Thread.currentThread().getStackTrace()[0].getMethodName());
return builder;
};
}
}
Компонент с несколькими маршрутами:
@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
@Autowired
private FrameworkConfig frameworkConfig;
@Override
public void configure() throws Exception {
String consumerQueueq = "activemq:queue:queue1?" //
+ "brokerURL=tcp://ip:port";
String consumerActiveMqDLQ = "activemq:queue:ActiveMQ.DLQ?" //
+ "brokerURL=tcp://ip:port";
String consumerQueue2 = "activemq:queue:queue2?" //
+ "brokerURL=tcp://ip:port";
String emitterInfluxDB = "influxdb://influxDb?databaseName=databaseName"
+ "&batch=true" //
+ "&retentionPolicy=retentionPolicy"
String emitterStreamOut = "stream:out";
//************************************************************************
// Data from cryring_db_inbound to InfluxDB
//************************************************************************
from(consumerCryringInbound) //
.process(messagePayload -> {
Message message = messagePayload.getIn();
if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
Processor.processMessage(message.getBody(String.class), message);
} else {
Processor.processMessage(message);
}
})//
.to(emitterInfluxDB) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.log("error") //
.to(emitterStreamOut);
//************************************************************************
// Data from cryring_db_inbound to InfluxDB
//************************************************************************
from(consumerActiveMqDLQ) //
.process(messagePayload -> {
Message message = messagePayload.getIn();
if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
Processor.processMessage(message.getBody(String.class), message);
} else {
Processor.processMessage(message);
}
})//
.to(emitterInfluxDB) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.log("error") //
.to(emitterStreamOut);
//************************************************************************
// Data from olog_inbound to olog
//************************************************************************
from(consumerOlog) //
.process(messagePayload -> {
System.out.println(messagePayload.getIn());
}) //
.to(emitterStreamOut);
}
}