С потоками Java 8 я могу группировать сообщения по классификатору:
Map<String, List<String>> grouped = Arrays.asList("a", "b", "b", "b", "c")
.stream()
.collect(Collectors.groupingBy(Function.identity()));
Я хочу написать агрегатор, который соответственно группирует сообщения.Из пяти сообщений с полезными нагрузками, как показано выше, я хочу создать три сообщения: первое должно иметь "a"
в качестве полезной нагрузки, второе должно иметь список из трех "b"
в качестве полезной нагрузки, третье должно иметь "c"
в качестве полезной нагрузки.
Все группы сообщений должны быть освобождены после достижения размера последовательности.Группировка на основе полезной нагрузки работает нормально, но группы сообщений никогда не освобождаются.
В стратегии выпуска у меня есть доступ к размеру последовательности, но я не могу узнать общее количество элементов, которые были обработаны.Как я могу выпустить мои сгруппированные сообщения?
public interface StringGrouper {
List<Message<?> groupSame(List<String> toGroup);
}
@Bean
public IntegrationFlow groupStringsFlow() {
return IntegrationFlows.from(StringGrouper.class)
.split()
.aggregate(agg -> agg
.correlationStrategy(message -> message.getPayload())
.releaseStrategy(group -> group.getSequenceSize() == /* what? */))
.logAndReply();
}
@Test
public void shouldGroupMessages {
List<Message<?> grouper
.groupSame(Arrays.asList("a", "b", "b", "b", "c"));
}
Обходной путь - вообще не использовать агрегатор и вместо этого группировать входящий список в преобразователе.Но я ожидаю, что я могу использовать агрегатор для этого.
@Bean
public IntegrationFlow groupStringsFlow() {
return IntegrationFlows.from(StringGrouper.class)
.<List<String>, Collection<List<String>>>transform(source -> source.stream()
.collect(Collectors.collectingAndThen(
Collectors.groupingBy(Function.identity()),
grouped -> grouped.values())))
.split()
.log() // work with messages
.aggregate()
.get();
}