private void startProcess() throws Exception {
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.setBucketName(bucket);
listObjectsV2Request.setPrefix(prefix);
ListObjectsV2Result listObjectsV2Result;
do {
listObjectsV2Result = s3Client.listObjectsV2(listObjectsV2Request);
execute(listObjectsV2Result.getObjectSummaries());
listObjectsV2Request.setContinuationToken(listObjectsV2Result.getNextContinuationToken());
} while(listObjectsV2Result.isTruncated());
}
public void execute(List<S3ObjectSummary> s3ObjectSummaries) throws Exception{
List<List<S3ObjectSummary>> results = partition(s3ObjectSummaries, 100);
List<Future<?>> futureResults = new ArrayList<>();
for (List<S3ObjectSummary> summaries : results) {
futureResults.add(executor.submit(() -> {
process(summaries);
return true;
}));
}
for (Future<?> asyncResult : futureResults) {
asyncResult.get();
}
}
private List<List<S3ObjectSummary>> partition(List<S3ObjectSummary> list, Integer partitionSize) {
int numberOfLists = BigDecimal.valueOf(list.size())
.divide(BigDecimal.valueOf(partitionSize), 0, CEILING)
.intValue();
return IntStream.range(0, numberOfLists)
.mapToObj(it -> list.subList(it * partitionSize, Math.min((it+1) * partitionSize, list.size())))
.collect(Collectors.toList());
}
void process(List< S3ObjectSummary> list){
// Process the summary here
}
Приведенный выше код извлекает 1000 записей для каждого следующего токена. Я хотел обработать 1000 записей, разделив их на 100 пакетов, и каждые 100 записей были отправлены в качестве задачи в службу потоков исполнителя. Таким образом, должен быть 1 пул потоков с 10 потоками и каждый поток, выполняющий 100 сводок объектов. Когда начинается первый поток и возникает исключение из-за первой записи, весь поток прерывается, и я не вижу выполнения следующих 99 записей. То же самое происходит и с другими потоками. Как убедиться, что поток не прерывает и продолжает обработку следующих записей, даже если есть исключение?