Как просматривать сообщения из очереди, используя Apache Camel? - PullRequest
0 голосов
/ 27 июня 2018

Мне нужно просматривать сообщения с активного mq по маршруту Camel без использования сообщений.

Сообщения в очереди JMS должны быть прочитаны (только просмотрены и не использованы) и перемещены в базу данных, гарантируя, что исходная очередь остается нетронутой.

public class CamelStarter {

   private static CamelContext camelContext;

            public static void main(String[] args) throws Exception {
                            camelContext = new DefaultCamelContext();
                            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);

                            camelContext.addComponent("jms",  JmsComponent.jmsComponent(connectionFactory));

                            camelContext.addRoutes(new RouteBuilder() {

                                            @Override
                                            public void configure() throws Exception {
                                                from("jms:queue:testQueue").to("browse:orderReceived") .to("jms:queue:testQueue1");
                                            }

                                            }
                            );

                            camelContext.start();

                            Thread.sleep(1000);

                             inspectReceivedOrders();

                            camelContext.stop();

            }

public static void inspectReceivedOrders () {

BrowsableEndpoint browse = camelContext.getEndpoint("browse:orderReceived", BrowsableEndpoint.class);
List<Exchange> exchanges = browse.getExchanges();
System.out.println("Browsing queue: "+ browse.getEndpointUri() + " size: " + exchanges.size());
for (Exchange exchange : exchanges) {
  String payload = exchange.getIn().getBody(String.class);
  String msgId = exchange.getIn().getHeader("JMSMessageID", String.class);
  System.out.println(msgId + "=" +payload);

}

Ответы [ 2 ]

0 голосов
/ 28 июня 2018

Насколько я знаю, в Camel невозможно прочитать (не потребляя!) Сообщения JMS :-( Единственный обходной путь, который я нашел (в приложении JEE), состоял в том, чтобы определить загрузочный EJB с таймером, удерживая QueueBrowser и делегировать обработку msg маршруту Camel:

@Singleton
@Startup
public class MyQueueBrowser  {

    private TimerService timerService;

    @Resource(mappedName="java:/jms/queue/com.company.myqueue")
    private Queue sourceQueue;

    @Inject
    @JMSConnectionFactory("java:/ConnectionFactory")
    private JMSContext jmsContext;  

    @Inject
    @Uri("direct:readMessage")
    private ProducerTemplate camelEndpoint;


    @PostConstruct
    private void init() {       
        TimerConfig timerConfig = new TimerConfig(null, false);
        ScheduleExpression se = new ScheduleExpression().hour("*").minute("*/"+frequencyInMin);
        timerService.createCalendarTimer(se, timerConfig);
    }


    @Timeout
    public void scheduledExecution(Timer timer) throws Exception {      
        QueueBrowser browser = null;
        try {                       
            browser = jmsContext.createBrowser(sourceQueue);                                           
            Enumeration<Message> msgs = browser.getEnumeration();
            while ( msgs.hasMoreElements() ) { 
                Message jmsMsg = msgs.nextElement(); 
                // + here: read body and/or properties of jmsMsg                                            
                camelEndpoint.sendBodyAndHeader(body, myHeaderName, myHeaderValue);
            }                                                                               
        } catch (JMSRuntimeException jmsException) {
            ...
        } finally {        
            browser.close();
        }
    }


}
0 голосов
/ 27 июня 2018

Компонент Apache Camel Browse предназначен именно для этого. Проверьте здесь для документации.

Не могу сказать больше, так как вы не предоставили никакой другой информации.

Предположим, у вас есть такой маршрут

from("activemq:somequeue).to("bean:someBean")  

или

from("activemq:somequeue).process(exchange -> {})  

Все, что вам нужно сделать, это поместить конечную точку просмотра между этими

from("activemq:somequeue).to("browse:someHandler").to("bean:someBean")   

Тогда напишите такой класс

@Component
public class BrowseQueue {

  @Autowired
  CamelContext camelContext;

  public void inspect() {
    BrowsableEndpoint browse = camelContext.getEndpoint("browse:someHandler", BrowsableEndpoint.class);
    List<Exchange> exchanges = browse.getExchanges();


    for (Exchange exchange : exchanges) {
      ...... 
    }
  }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...