Как агрегировать сообщения с одинаковыми заголовками в Spring Integration - PullRequest
0 голосов
/ 31 января 2020
public static void main(String[] args) throws InterruptedException {
    AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
    ctx.register(Main.class);
    ctx.refresh();
    DirectChannel channel1 = ctx.getBean("channel1", DirectChannel.class);

    ctx.getBean("channel2", PublishSubscribeChannel.class).subscribe(message ->
           System.out.println("Output: " + message));

    channel1.send(MessageBuilder.withPayload("p1")
        .setHeader(CORRELATION_ID, 1)
        .setHeader(SEQUENCE_SIZE,2)
        .setHeader(SEQUENCE_NUMBER,1)
        .setHeader("a", 1)
        .build());

    channel1.send(MessageBuilder.withPayload("p2")
        .setHeader(CORRELATION_ID, 1)
        .setHeader(SEQUENCE_SIZE,2)
        .setHeader(SEQUENCE_NUMBER,2)
        .setHeader("a", 2)
        .build());
}

    @Bean
    public MessageChannel channel1() {
        return MessageChannels.direct().get();
    }

    @Bean
    public MessageChannel channel2() {
        return MessageChannels.publishSubscribe().get();
    }

    @Bean
    public IntegrationFlow flow1() {
        return IntegrationFlows
                .from("channel1")
                .aggregate(a -> a
                        .releaseStrategy(new SequenceSizeReleaseStrategy())
                        .expireGroupsUponCompletion(true)
                        .sendPartialResultOnExpiry(true))
                .channel("channel2")
                .get();
    }

Вывод: GenericMessage [payload = [p1, p2], заголовки = {sequenceNumber = 2, a = 2, correlationId = 1, id = b5e51041-c967-1bb4-1601-7e468ae28527, sequenceSize = 2, отметка времени = 1580475773518}]

Заголовки "a" и "sequenceNumber" были перезаписаны. Как агрегировать сообщения с одинаковыми заголовками?

Так и должно быть

Вывод: GenericMessage [payload = [p1, p2], заголовки = {sequenceNumber = [1,2] , a = [1, 2], correlationId = 1, id = b5e51041-c967-1bb4-1601-7e468ae28527, sequenceSize = 2, метка времени = 1580475773518}]

1 Ответ

0 голосов
/ 31 января 2020

См. AbstractAggregatingMessageGroupProcessor:

/**
 * Specify a {@link Function} to map {@link MessageGroup} into composed headers for output message.
 * @param headersFunction the {@link Function} to use.
 * @since 5.2
 */
public void setHeadersFunction(Function<MessageGroup, Map<String, Object>> headersFunction) {

, а также:

/**
 * The {@link Function} implementation for a default headers merging in the aggregator
 * component. It takes all the unique headers from all the messages in group and removes
 * those which are conflicted: have different values from different messages.
 *
 * @author Artem Bilan
 *
 * @since 5.2
 *
 * @see AbstractAggregatingMessageGroupProcessor
 */
public class DefaultAggregateHeadersFunction implements Function<MessageGroup, Map<String, Object>> {

Или просто давно существует:

/**
 * This default implementation simply returns all headers that have no conflicts among the group. An absent header
 * on one or more Messages within the group is not considered a conflict. Subclasses may override this method with
 * more advanced conflict-resolution strategies if necessary.
 * @param group The message group.
 * @return The aggregated headers.
 */
protected Map<String, Object> aggregateHeaders(MessageGroup group) {

Итак, что вам нужно в вашем aggregate() конфигурация является опцией outputProcessor(MessageGroupProcessor outputProcessor).

Для получения дополнительной информации см. Документы: https://docs.spring.io/spring-integration/docs/5.2.3.RELEASE/reference/html/message-routing.html#aggregatingmessagehandler

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...