ActiveMQ очередь недоставленных писем с весенней загрузкой - PullRequest
0 голосов
/ 09 ноября 2018

Я хотел бы реализовать базовый deadLetterQueue с использованием Java DSL, встроенного ActiveMQ и KahaDB в качестве адаптера персистентности. У меня есть текстовый файл в моем каталоге C:/JavaProjects/tmp/input. Я хотел бы поместить файл во внутреннюю очередь, произвести неудачную доставку с вызовом .rollback (), затем сообщение должно быть перенаправлено в deadLetterQueue, после чего я возьму данные из deadLetterQueue и поместу их в каталог C:/JavaProjects/tmp/output. Все сообщения успешно доставляются на incoming, но, по-видимому, они не попадают в очередь недоставленных сообщений. Как я могу это сделать?

Вот мой основной класс:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import java.io.File;
import java.util.Arrays;

@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
}

@Value("${activemq.broker-url}")
private String brokerUrl;

@Value("${activemq.user}")
private String userName;

@Value("${activemq.password}")
private String password;

@Bean
public BrokerService setupBroker() {

    try {
        brokerService.addConnector(brokerUrl);
        brokerService.setUseJmx(true);
        brokerService.setUseShutdownHook(false);
        brokerService.setAdvisorySupport(true);
        brokerService.setEnableStatistics(true);

brokerService.setPersistenceAdapter(createKahaDBPersistenceAdapter());
        brokerService.setTempDataStore(createKahaDBTempDataStore());

        brokerService.start();

        Thread.sleep(6000);

        brokerService.stop();

    } catch (Exception e) {
        e.printStackTrace();
    }
    return brokerService;
}

private KahaDBPersistenceAdapter createKahaDBPersistenceAdapter() {
    final KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
    kahaDBPersistenceAdapter.setDirectory(new File("C:\\JavaProjects\\tmp", "activemq/kahadb"));
    kahaDBPersistenceAdapter.setCompactAcksIgnoresStoreGrowth(true);
    kahaDBPersistenceAdapter.setCompactAcksAfterNoGC(5);
    return kahaDBPersistenceAdapter;
}

private PListStoreImpl createKahaDBTempDataStore() {
    final PListStoreImpl tempKahaDBStore = new PListStoreImpl();
    tempKahaDBStore.setDirectory(new File("C:\\JavaProjects\\tmp", "activemq/tmp"));
    return tempKahaDBStore;
}

@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
    final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
    factory.setTrustedPackages(Arrays.asList("com.example.demo"));
    return factory;
}
}

Вот мой роутер:

@Component
public class MyRouter extends RouteBuilder {

private static Logger logger = LoggerFactory.getLogger(MyRouter.class);

@Override
public void configure() throws Exception {


    onException(Exception.class)
            .log(LoggingLevel.ERROR, logger, "THE ERROR IS: " + exceptionMessage().toString());

    errorHandler(deadLetterChannel("activemq:queue:deadLetterQueue")
                    .deadLetterHandleNewException(true)//guarantees to deadLetterChannel to complete.
                    .useOriginalMessage()
                    .backOffMultiplier(2)
                    .redeliveryDelay(100)
                    .maximumRedeliveries(3)
                    .log("TO DEAD LETTER QUEUE!"));

    /*
    * Receiving data to an internal queue "incoming",
    * then calling rollback() to send the info from internal queue to dead Letter queue
    * */

    from("file:/JavaProjects/tmp/input?noop=true")
            .to("activemq:queue:incoming");
    from("activemq:queue:incoming")
            .process(new MyProcessor())
            .rollback("'INSIDE ROLLBACK' Lets go to the DEAD QUEUE!")
            .to("file:/JavaProjects/tmp/output");

    //Getting data from Dead Letter Queue and put it to our output folder

    from("activemq:queue:deadLetterQueue")
            .process(new MyProcessor())
            .to("file:/JavaProjects/tmp/output");
}
}

Вот мой процессор:

    public class MyProcessor implements Processor{

@Override
public void process(Exchange exchange) throws Exception {

    System.out.println("\nMessage ID is: " + exchange.getIn().getMessageId());
    System.out.println("\nFile`s content is: " + exchange.getIn().getBody(String.class) + "\n");

}
}

Любая помощь будет оценена.

...