Spring интеграция inbound-gateway Запустить событие, когда очередь пуста - PullRequest
2 голосов
/ 17 мая 2011

Я новичок, но постараюсь быть на службе.

{INPUT QUEUE}->[INBOUND-GATEWAY-1]-->[ROUTER]----------->(ACTIVATOR)<---------------
                                        \                                          /
                                         \-->{HOLD QUEUE}--->[INBOUND-GATEWAY-2]--^

У меня есть сценарий, в котором я должен динамически изменять условия маршрутизации в потоке, подобном первому. Сообщения, поступающие из очереди, отправляются активатору для обработки или в другую очередь для удержания. В определенное время я должен закрыть INBOUND-GATEWAY-1, чтобы новые сообщения не поступали в поток, и открыть INBOUND-GATEWAY-2, чтобы разрешить обработку всех сообщений из HOLD QUEUE. После того, как все сообщения от HOLD QUEUE были использованы, оба шлюза должны быть закрыты / открыты, как они были раньше. Дело в том, как я могу узнать, когда HOLD QUEUE пусто, чтобы я мог вызвать метод, в котором можно было запустить gateway-1?

Буду благодарен, если кто-нибудь сможет мне помочь.

Заранее спасибо

Ответы [ 2 ]

1 голос
/ 24 мая 2011

После некоторой отладки и чтения, наконец, я пришел к решению этой проблемы.Входящий шлюз - это JmsMessageDrivenEndpoint, основанный на двух внутренних компонентах, MessageListenerContainer и MessageListener.MessageListenerContainer является ответственным за планирование поведения MessageListener, поэтому, переопределяя noMessageReceived и messageReceived и добавляя некоторые атрибуты для управления желаемым поведением, я мог бы выполнить «магию».

Моя реализация MessageListenerContainer стала похожейthis.

public class ControlMessageListenerContainer extends DefaultMessageListenerContainer{

    private JmsMessageDrivenEndpoint mainInputGateway;

    private long timeOut;

    private long lastTimeReceived;  

    public PassControlMessageListenerContainer() {
        this.setAutoStartup(false);
    }

    @Override
    public void start() throws JmsException {
        /*When the container is started the lastTimeReceived is set to actial time*/
        lastTimeReceived = (new Date()).getTime();
        super.start();
    }

    @Override
    protected void noMessageReceived(Object invoker, Session session) {
        long actualTime = (new Date()).getTime();

        if((actualTime - lastTimeReceived) >= timeOut 
                && mainInputGateway != null && !mainInputGateway.isRunning()){
            mainInputGateway.start();
        }       
        super.noMessageReceived(invoker, session);
    }

    @Override
    protected void messageReceived(Object invoker, Session session) {
        /*lastTimeReceived is set again to actual time at new message arrive*/
        lastTimeReceived = (new Date()).getTime();
        super.messageReceived(invoker, session);
    }
}

И, наконец, конфигурация bean-компонента Spring выглядит следующим образом:

<bean id="listenerContainer" 
    class="org.merol.ControlMessageListenerContainer">
    <property name="mainInputGateway" ref="mainGateway" />
    <property name="destination" ref="onHoldQueue" />
    <property name="timeOut" value="10000"/>
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

<bean id="messageListener" 
    class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener">
    <property name="requestChannel" ref="outputChannel" />
</bean>

<bean id="inboundGateway" 
    class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
    <constructor-arg name="listenerContainer" ref="listenerContainer" />
    <constructor-arg name="listener" ref="messageListener" />
</bean>

Надеюсь, это может быть полезно для кого-то еще.

Спасибо @Nicholasдля подсказок.

0 голосов
/ 17 мая 2011

Я бы включил эту функцию в процессоры входящего шлюза.Например:

Gateway1Processor:

  • start () : запуск потребителя из основной очереди и процесса.
  • stop () : остановка потребителя.

Gateway2Processor:

  • start () :Запустите потребителя из очереди HOLD.Укажите соответствующий тайм-аут.По истечении времени ожидания (очередь HOLD пуста) вызовите stop () .
  • stop () : запустите Gateway1Processor и остановите этопотребитель.

Следовательно, последовательность операций будет такой:

  1. Запуск Gateway1Processor
  2. В определенное время , вызов Gateway1Processor.stop () и Gateway2Processor.start ()
  3. Gateway2Processor истощит очередь HOLD, перезапустите Gateway1Processor и остановка.
  4. Перейти к # 2.
...