Написать службу исполнителя для обработки в пакете из 100 записей - PullRequest
0 голосов
/ 24 марта 2020
private void startProcess() throws Exception {

        ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
        listObjectsV2Request.setBucketName(bucket);
        listObjectsV2Request.setPrefix(prefix);
        ListObjectsV2Result listObjectsV2Result;

        do {
            listObjectsV2Result = s3Client.listObjectsV2(listObjectsV2Request);
            execute(listObjectsV2Result.getObjectSummaries());
            listObjectsV2Request.setContinuationToken(listObjectsV2Result.getNextContinuationToken());
        } while(listObjectsV2Result.isTruncated());

    }


    public void execute(List<S3ObjectSummary> s3ObjectSummaries) throws Exception{
        List<List<S3ObjectSummary>> results = partition(s3ObjectSummaries, 100);

        List<Future<?>> futureResults = new ArrayList<>();
        for (List<S3ObjectSummary> summaries : results) {
            futureResults.add(executor.submit(() -> {
                process(summaries);

                return true;
            }));
        }
        for (Future<?> asyncResult : futureResults) {
            asyncResult.get();
        }
    }

    private List<List<S3ObjectSummary>> partition(List<S3ObjectSummary> list, Integer partitionSize) {
        int numberOfLists = BigDecimal.valueOf(list.size())
                .divide(BigDecimal.valueOf(partitionSize), 0, CEILING)
                .intValue();

        return IntStream.range(0, numberOfLists)
                .mapToObj(it -> list.subList(it * partitionSize, Math.min((it+1) * partitionSize, list.size())))
                .collect(Collectors.toList());
    }

Приведенный выше код извлекает 1000 записей для каждого следующего токена. Я хотел обработать 1000 записей, разделив их на 100 пакетов, и каждые 100 записей были отправлены в качестве задачи в службу потоков исполнителя. Таким образом, должно быть 10 пулов потоков, и каждый пул выполняет 100 записей асинхронно. Есть ли какой-нибудь пример, который может помочь моему сценарию? Должен ли я перебрать 1000 записей и поместить их в карту первым. Для каждой сотой записи в итерации я отправляю карту в службу-исполнитель, а flu sh карту для следующей итерации и далее? Это правильный подход? какие-либо предложения?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...