Вот пример агрегатора :
from("file:///somePath/consume/?maxMessagesPerPoll=2&delay=5000")
.aggregate(constant(true), new ZipAggregationStrategy()).completion(exchange -> exchange.getProperty("CamelBatchComplete", Boolean.class))
.to("file:///somePath/produce/")
Здесь maxMessagesPerPoll определяет, сколько файлов будет заархивировано. Но если их количество в папке меньше значения maxMessagesPerPoll, он будет ожидать пропущенных файлов для полного архива. Вот пример ZipAggregationStrategy:
private static class ZipAggregationStrategy implements AggregationStrategy {
private ZipOutputStream zipOutputStream;
private ByteArrayOutputStream out;
@Override
public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) {
try {
if (oldExchange == null) {
out = new ByteArrayOutputStream();
zipOutputStream = new ZipOutputStream(out);
}
createEntry(newExchange);
return newExchange;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void createEntry(final Exchange exchange) throws Exception {
final ZipEntry zipEntry = new ZipEntry(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
zipOutputStream.putNextEntry(zipEntry);
byte[] bytes = new byte[1024];
int length;
try (InputStream body = exchange.getIn().getBody(InputStream.class)) {
while ((length = body.read(bytes)) >= 0) {
zipOutputStream.write(bytes, 0, length);
}
}
}
@Override
public void onCompletion(final Exchange exchange) {
try {
zipOutputStream.close();
exchange.getIn().setBody(new ByteArrayInputStream(out.toByteArray()));
exchange.getIn().setHeader(Exchange.FILE_NAME, "someArchive.zip");
}catch (Exception e){
throw new RuntimeException(e);
} finally {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Это пример в памяти. Вы можете улучшить это, например, с помощью временного файла. И вы всегда можете создать свой собственный предикат завершения на основе вашей логики c.
UPD: я думаю, что ссылка на документацию временно недоступна