Это Агрегатор EIP .
from("file:inputFolder")
.aggregate(constant(true), AggregationStrategies.groupedExchange())
.completionSize(2) //Wait for two files
.completionTimeout(60000) //Or process single file, if completionSize was not fulfilled within one minute
.to("log:do_something") //Here you can access List<Exchange> from message body
Для группировки сообщений вы можете использовать корреляцию Expression
.Для вашего примера (группировать сообщения по префиксу имени файла перед _
) это может быть что-то вроде этого:
private final Expression CORRELATION_EXPRESSION = new Expression() {
@Override
public <T> T evaluate(Exchange exchange, Class<T> type) {
final String fileName = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
final String correlationExpression = fileName.substring(0, fileName.indexOf('_'));
return exchange.getContext().getTypeConverter().convertTo(
type,
correlationExpression
);
}
};
И передать его Aggregator
:
from("file:inputDirectory")
.aggregate(CORRELATION_EXPRESSION, AggregationStrategies.groupedExchange())
...
См. Эту сутьдля полного примера https://gist.github.com/bedlaj/a2a56aa9291bced8c0a8edebacaf22b0