Балансировка нагрузки Spring в очереди JMS - PullRequest
0 голосов
/ 24 августа 2018

Я хотел бы взять сообщения JMS из одной входной очереди и развернуть их в очереди вывода N.

У меня есть простой поток, который будет пересылать сообщения в один пункт назначения, но не могу понять, какприменить LoadBalancer, чтобы разрешить несколько назначений в циклическом режиме.

Есть идеи, как это сделать?

@Configuration
public class TestLoadBalance {

    public static final String INPUT_QUEUE = "_dev.lb.input";
    public static final String OUTPUT_QUEUE_PREFIX = "_dev.lb.output-";


    @Bean
    public IntegrationFlow testLoadBalanceFlow(
            ConnectionFactory jmsConnectionFactory) {

        IntegrationFlow flow =  IntegrationFlows.from(
                Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
                        .destination(INPUT_QUEUE)
        )
                .handle(buildOutput(jmsConnectionFactory, 1))
                // cant have 2nd handle. gets warn & flow end:
                // The 'currentComponent' (org.springframework.integration.jms.JmsSendingMessageHandler@516462cc) 
                // is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'
                //.handle(buildOutput(jmsConnectionFactory, 2))
                .get();
        return flow;
    }


    private JmsSendingMessageHandler buildOutput(ConnectionFactory jmsConnectionFactory, int i){
        return Jms.outboundAdapter(jmsConnectionFactory)
                .destination(OUTPUT_QUEUE_PREFIX + i).get();
    }
}

Ответы [ 2 ]

0 голосов
/ 24 августа 2018

Основываясь на примерах Гэри, я применил метод destinationExpression:

@Configuration
public class TestLoadBalance {

    public static final String INPUT_QUEUE = "_dev.lb.input";
    public static final String OUTPUT_QUEUE_PREFIX = "_dev.lb.output-";

    @Bean
    public JmsDestinationPartitioner partitioner() {
        return new JmsDestinationPartitioner(OUTPUT_QUEUE_PREFIX,1,3);
    }

    @Bean
    public IntegrationFlow testLoadBalanceFlow(
            ConnectionFactory jmsConnectionFactory) {

        IntegrationFlow flow =  IntegrationFlows.from(
                Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
                        .destination(INPUT_QUEUE)
        )
                .handle(Jms.outboundAdapter((jmsConnectionFactory))
                        .destinationExpression("@partitioner.nextDestination()"))
                .get();
        return flow;
    }

}

Обертка вокруг AtomicInt для обработки имен с префиксом:

public class JmsDestinationPartitioner {

    private int min;
    private int max;
    private String prefix;

    private AtomicInteger current;

    public JmsDestinationPartitioner(String prefix, int min, int max){
        this.prefix = prefix;
        this.min = min;
        this.max = max;
        current = new AtomicInteger(min);
    }


    public int getAndIncrement(){
        int  i = current.get();
        current.getAndIncrement();
        if (current.get() > max){
            current.set(min);
        }
        return i;
    }

    public String nextDestination(){
        return prefix + getAndIncrement();
    }
}
0 голосов
/ 24 августа 2018

Есть несколько способов сделать это; Вы можете иметь несколько подписчиков на канале ...

@Bean
public IntegrationFlow inbound(ConnectionFactory cf) {
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(cf)
                .destination("foo"))
            .channel(roundRobin())
            .get();
}

@Bean
public DirectChannel roundRobin() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outbound1(ConnectionFactory cf) {
    return IntegrationFlows.from(roundRobin())
            .bridge() // otherwise log() will wire tap the roundRobin channel
            .log()
            .log(new LiteralExpression("Sending to bar"))
            .handle(Jms.outboundAdapter(cf)
                    .destination("bar"))
            .get();
}

@Bean
public IntegrationFlow outbound2(ConnectionFactory cf) {
    return IntegrationFlows.from(roundRobin())
            .bridge() // otherwise log() will wire tap the roundRobin channel
            .log()
            .log(new LiteralExpression("Sending to baz"))
            .handle(Jms.outboundAdapter(cf)
                    .destination("baz"))
            .get();
}

Или вы можете использовать целевое выражение:

@Bean
public AtomicInteger toggle() {
    return new AtomicInteger();
}

@Bean
public IntegrationFlow inbound(ConnectionFactory cf) {
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(cf)
                    .destination("foo"))
            .handle(Jms.outboundAdapter(cf)
                    .destinationExpression("@toggle.getAndIncrement() % 2 == 0 ? 'bar' : 'baz'"))
            .get();
}

@JmsListener(destination = "bar")
public void bar(String in) {
    System.out.println("received " + in + " from bar");
}

@JmsListener(destination = "baz")
public void baz(String in) {
    System.out.println("received " + in + " from baz");
}

Результат:

received test1 from bar
received test2 from baz
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...