Корреляция сообщений между двумя очередями JMS с использованием компонентов интеграции Spring - PullRequest
0 голосов
/ 22 марта 2019

У меня есть 2 JMS-очереди, и мое приложение подписывается на обе из них с компонентом Jms.messageDrivenChannelAdapter(...).

Первая очередь принимает сообщения типа Paid. Вторая очередь принимает сообщения типа Reversal.

Бизнес-сценарий определяет корреляцию между сообщениями типа Paid и типа Reversal.

Reversal должен ждать Paid для обработки.

Как я могу добиться такого шаблона ожидания с помощью Spring Integration?

Можно ли соотнести сообщения между двумя очередями JMS?

1 Ответ

1 голос
/ 22 марта 2019

См. документацию об агрегаторе .

Агрегатор коррелирует сообщения, используя некоторую стратегию корреляции, и освобождает группу на основе некоторой стратегии выпуска.

Агрегатор объединяет группу связанных сообщений, сопоставляя и сохраняя их, пока группа не будет считаться завершенной. В этот момент агрегатор создает одно сообщение, обрабатывая всю группу, и отправляет агрегированное сообщение в качестве вывода.

Выходная полезная нагрузка - это список сгруппированных полезных нагрузок по умолчанию, но вы можете предоставить собственный обработчик вывода.

EDIT

@SpringBootApplication
public class So55299268Application {

    public static void main(String[] args) {
        SpringApplication.run(So55299268Application.class, args);
    }

    @Bean
    public IntegrationFlow in1(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination("queue1"))
                .channel("aggregator.input")
                .get();
    }

    @Bean
    public IntegrationFlow in2(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination("queue2"))
                .channel("aggregator.input")
                .get();
    }

    @Bean
    public IntegrationFlow aggregator() {
        return f -> f
                .aggregate(a -> a
                        .correlationExpression("headers.jms_correlationId")
                        .releaseExpression("size() == 2")
                        .expireGroupsUponCompletion(true)
                        .expireGroupsUponTimeout(true)
                        .groupTimeout(5_000L)
                        .discardChannel("discards.input"))
                .handle(System.out::println);
    }

    @Bean
    public IntegrationFlow discards() {
        return f -> f.handle((p, h) -> {
            System.out.println("Aggregation timed out for " + p);
            return null;
        });
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            send(template, "one", "two");
            send(template, "three", null);
        };
    }

    private void send(JmsTemplate template, String one, String two) {
        template.convertAndSend("queue1", one, m -> {
            m.setJMSCorrelationID(one);
            return m;
        });
        if (two != null) {
            template.convertAndSend("queue2", two, m -> {
                m.setJMSCorrelationID(one);
                return m;
            });
        }
    }

}

и

GenericMessage [payload = [two, one], заголовки = {jms_redelivered = false, jms_destination = queue: // queue1, jms_correlationId = one, id = 784535fe-8861-1b22-2cfa-cc2e67763674, приоритет = 4, jms_timestamp = 1553290921442, jms_messageId = ID: Gollum2.local-55540-1553290921241-4: 1: 3: 1: 1, отметка времени = 1553290921457}]

2019-03-22 17: 42: 06.460 INFO 55396 --- [ask-scheduler-1] o.s.i.a.AggregatingMessageHandler: группа сообщений с истекающим сроком действия с корреляциейKey [three]

Время агрегации истекло в течение трех

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...