Групповые сообщения Spring-интеграции по классификатору - PullRequest
0 голосов
/ 28 марта 2019

С потоками Java 8 я могу группировать сообщения по классификатору:

Map<String, List<String>> grouped = Arrays.asList("a", "b", "b", "b", "c")
        .stream()
        .collect(Collectors.groupingBy(Function.identity()));

Я хочу написать агрегатор, который соответственно группирует сообщения.Из пяти сообщений с полезными нагрузками, как показано выше, я хочу создать три сообщения: первое должно иметь "a" в качестве полезной нагрузки, второе должно иметь список из трех "b" в качестве полезной нагрузки, третье должно иметь "c" в качестве полезной нагрузки.

Все группы сообщений должны быть освобождены после достижения размера последовательности.Группировка на основе полезной нагрузки работает нормально, но группы сообщений никогда не освобождаются.

В стратегии выпуска у меня есть доступ к размеру последовательности, но я не могу узнать общее количество элементов, которые были обработаны.Как я могу выпустить мои сгруппированные сообщения?

public interface StringGrouper {
    List<Message<?> groupSame(List<String> toGroup);
}   

@Bean
public IntegrationFlow groupStringsFlow() {
    return IntegrationFlows.from(StringGrouper.class)
        .split()
        .aggregate(agg -> agg
            .correlationStrategy(message -> message.getPayload())
            .releaseStrategy(group -> group.getSequenceSize() == /* what? */)) 
        .logAndReply();
}

@Test
public void shouldGroupMessages {
    List<Message<?> grouper
        .groupSame(Arrays.asList("a", "b", "b", "b", "c"));
}

Обходной путь - вообще не использовать агрегатор и вместо этого группировать входящий список в преобразователе.Но я ожидаю, что я могу использовать агрегатор для этого.

@Bean
public IntegrationFlow groupStringsFlow() {
    return IntegrationFlows.from(StringGrouper.class)
        .<List<String>, Collection<List<String>>>transform(source -> source.stream()
            .collect(Collectors.collectingAndThen(
                Collectors.groupingBy(Function.identity()), 
                grouped -> grouped.values())))
        .split()
        .log() // work with messages
        .aggregate()
        .get();
}

1 Ответ

1 голос
/ 28 марта 2019

Объедините их в одну группу со стратегией выпуска размера последовательности по умолчанию и используйте пользовательский выходной процессор (MessageGroupProcessor) для перегруппировки полезной нагрузки, возвращая Collection<Message<List<?>>>.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...