Я хотел бы реализовать базовый deadLetterQueue с использованием Java DSL, встроенного ActiveMQ и KahaDB в качестве адаптера персистентности. У меня есть текстовый файл в моем каталоге C:/JavaProjects/tmp/input
. Я хотел бы поместить файл во внутреннюю очередь, произвести неудачную доставку с вызовом .rollback (), затем сообщение должно быть перенаправлено в deadLetterQueue, после чего я возьму данные из deadLetterQueue и поместу их в каталог C:/JavaProjects/tmp/output
. Все сообщения успешно доставляются на incoming
, но, по-видимому, они не попадают в очередь недоставленных сообщений. Как я могу это сделать?
Вот мой основной класс:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.io.File;
import java.util.Arrays;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Value("${activemq.broker-url}")
private String brokerUrl;
@Value("${activemq.user}")
private String userName;
@Value("${activemq.password}")
private String password;
@Bean
public BrokerService setupBroker() {
try {
brokerService.addConnector(brokerUrl);
brokerService.setUseJmx(true);
brokerService.setUseShutdownHook(false);
brokerService.setAdvisorySupport(true);
brokerService.setEnableStatistics(true);
brokerService.setPersistenceAdapter(createKahaDBPersistenceAdapter());
brokerService.setTempDataStore(createKahaDBTempDataStore());
brokerService.start();
Thread.sleep(6000);
brokerService.stop();
} catch (Exception e) {
e.printStackTrace();
}
return brokerService;
}
private KahaDBPersistenceAdapter createKahaDBPersistenceAdapter() {
final KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
kahaDBPersistenceAdapter.setDirectory(new File("C:\\JavaProjects\\tmp", "activemq/kahadb"));
kahaDBPersistenceAdapter.setCompactAcksIgnoresStoreGrowth(true);
kahaDBPersistenceAdapter.setCompactAcksAfterNoGC(5);
return kahaDBPersistenceAdapter;
}
private PListStoreImpl createKahaDBTempDataStore() {
final PListStoreImpl tempKahaDBStore = new PListStoreImpl();
tempKahaDBStore.setDirectory(new File("C:\\JavaProjects\\tmp", "activemq/tmp"));
return tempKahaDBStore;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.setTrustedPackages(Arrays.asList("com.example.demo"));
return factory;
}
}
Вот мой роутер:
@Component
public class MyRouter extends RouteBuilder {
private static Logger logger = LoggerFactory.getLogger(MyRouter.class);
@Override
public void configure() throws Exception {
onException(Exception.class)
.log(LoggingLevel.ERROR, logger, "THE ERROR IS: " + exceptionMessage().toString());
errorHandler(deadLetterChannel("activemq:queue:deadLetterQueue")
.deadLetterHandleNewException(true)//guarantees to deadLetterChannel to complete.
.useOriginalMessage()
.backOffMultiplier(2)
.redeliveryDelay(100)
.maximumRedeliveries(3)
.log("TO DEAD LETTER QUEUE!"));
/*
* Receiving data to an internal queue "incoming",
* then calling rollback() to send the info from internal queue to dead Letter queue
* */
from("file:/JavaProjects/tmp/input?noop=true")
.to("activemq:queue:incoming");
from("activemq:queue:incoming")
.process(new MyProcessor())
.rollback("'INSIDE ROLLBACK' Lets go to the DEAD QUEUE!")
.to("file:/JavaProjects/tmp/output");
//Getting data from Dead Letter Queue and put it to our output folder
from("activemq:queue:deadLetterQueue")
.process(new MyProcessor())
.to("file:/JavaProjects/tmp/output");
}
}
Вот мой процессор:
public class MyProcessor implements Processor{
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("\nMessage ID is: " + exchange.getIn().getMessageId());
System.out.println("\nFile`s content is: " + exchange.getIn().getBody(String.class) + "\n");
}
}
Любая помощь будет оценена.