Spring Integration - агрегат publishSubscribeChannel после разделения - PullRequest
0 голосов
/ 27 июня 2018

У меня есть поток на основе DSL, который использует split для перебора списка объектов и отправки сообщения Кафки:

.transform(...)
.split()
.channel(KAFKA_OUT_CHANNEL)

После того, как все сообщения были отправлены, мне нужно позвонить в службу, а также записать, сколько сообщений было обработано. Я понимаю, что подход заключается в использовании publishSubscribeChannel, когда первый subscribe выполняет фактическую отправку Kafka, а затем aggregate выполняет служебный вызов:

.transform(...)
.split().
.publishSubscribeChannel(pubSub -> pubSub
        .subscribe(f -> f.channel(KAFKA_OUT_CHANNEL)))

У меня проблемы с выяснением того, как на самом деле выполнить .aggregate часть в pubSubChannel с использованием DSL. Пока я пробовал:

.subscribe(f ->  f.channel(KAFKA_OUT_CHANNEL)
.subscribe(f -> f.aggregate(c -> c.processor( ?? ))))

Есть указатели?

Ответы [ 2 ]

0 голосов
/ 27 июня 2018

Это зависит от того, что вы хотите после агрегирования - если вы просто хотите получить список полезных нагрузок, просто используйте aggregate() ...

@SpringBootApplication
public class So51059703Application {

    public static void main(String[] args) {
        SpringApplication.run(So51059703Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(ApplicationContext context) {
        return args -> {
            context.getBean("flow.input", MessageChannel.class).send(new GenericMessage<>(
                    Arrays.asList("a", "b", "c")));
        };
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f
                .split()
                .publishSubscribeChannel(p -> p
                        .subscribe(f1 -> f1.handle(System.out::println))
                        .subscribe(f2 -> f2
                                .aggregate()
                                .handle(System.out::println)));
    }

}

Если вы просто хотите считать:

@SpringBootApplication
public class So51059703Application {

    public static void main(String[] args) {
        SpringApplication.run(So51059703Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(ApplicationContext context) {
        return args -> {
            context.getBean("flow.input", MessageChannel.class).send(new GenericMessage<>(
                    Arrays.asList("a", "b", "c")));
        };
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f
                .split()
                .publishSubscribeChannel(p -> p
                        .subscribe(f1 -> f1.handle(System.out::println))
                        .subscribe(f2 -> f2
                                .aggregate(c -> c
                                        .processor(processor(), "reduce"))
                                .handle(System.out::println)));
    }

    @Bean
    public Object processor() {
        return new Object() {

            public int reduce(List<Message<?>> messages) {
                return messages.size();
            }

        };
    }

}
0 голосов
/ 27 июня 2018

AbstractMessageSplitter имеет applySequence = true по умолчанию:

/**
 * Set the applySequence flag to the specified value. Defaults to true.
 * @param applySequence true to apply sequence information.
 */
public void setApplySequence(boolean applySequence) {

Имея в сообщениях следующие заголовки:

if (this.applySequence) {
    builder.pushSequenceDetails(correlationId, sequenceNumber, sequenceSize);
}

Стратегия корреляции Агрегатора по умолчанию действительно основана на заголовке IntegrationMessageHeaderAccessor.CORRELATION_ID. Таким образом, он собирает сообщения с одинаковым correlationKey в один MessageGroup. Значение по умолчанию ReleaseStrategy основано на сравнении MessageGroup и сравнении заголовка sequenceSize. В конце по умолчанию MessageGroupProcessor просто соберите все сообщения в группе в одно сообщение с Collection в качестве полезной нагрузки. Другими словами, поведение агрегатора по умолчанию абсолютно противоположно разделителю.

Я не знаю, какой вывод вы собираетесь делать из агрегатора, но вам не нужна никакая другая логика для его настройки - корреляция и освобождение логики должны основываться на состоянии по умолчанию.

Вы можете найти достаточно информации в Справочном руководстве .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...