Как улучшить производительность, выполнив загрузку более 130 элементов в aws s3 - PullRequest
9 голосов
/ 03 апреля 2019

Мне нужно перебрать более 130 объектов передачи данных, и каждый раз я буду генерировать json для загрузки в aws S3.

Без каких-либо улучшений требуется около 90 секунд на завершение всего процесса.Я попытался использовать lamba и не использовать lamba, результаты одинаковы для обоих.

for(AbstractDTO dto: dtos) {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
}
dtos.stream().forEach(dto -> {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
});

После некоторого исследования я пришел к выводу, что метод processDTO занимает около 0,650 мс на элемент для запуска.

Моя первая попытка состояла в том, чтобы использовать параллельные потоки , и результаты были довольно хорошими, для завершения всего процесса потребовалось 15 секунд :

dtos.parallelStream().forEach(dto -> {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
});

Но мне все еще нужно уменьшить это время.Я исследовал возможности улучшения параллельных потоков и обнаружил уловку ForkJoinPool :

ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLELISM_NUMBER);
forkJoinPool.submit(() ->
dtos.parallelStream().forEach(dto -> {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
})).get();
forkJoinPool.shutdown();

К сожалению, результаты меня немного смутили.

  • Когда PARALLELISM_NUMBER8, это займет около 13 секунд , чтобы завершить весь процесс.Не большое улучшение.
  • Когда PARALLELISM_NUMBER равен 16, для завершения всего процесса требуется около 8 секунд .
  • Когда PARALLELISM_NUMBER равно 32, это занимает около 5 секунд для завершения всего процесса.

Все тесты были выполнены с использованием запросов почтальона, вызывая метод контроллера, который завершит итерацию 130 элементов

Я доволен 5 секундами, используя 32 в качестве PARALLELISM_NUMBER, но я беспокоюсь о последствиях.

  • Можно ли сохранить 32?
  • Что такое идеальный PARALLELISM_NUMBER?
  • Что нужно иметь в виду при определении его значения?

Я работаю на Mac 2.2 ГГц I7

sysctl hw.physicalcpu hw.logicalcp
hw.physicalcpu: 4
hw.logicalcpu: 8

Вот что processDTO делает:

private void processDTO(int dealerCode, int yearPeriod, int monthPeriod, AbstractDTO dto) throws FileAlreadyExistsInS3Exception {
    String flatJson = JsonFlattener.flatten(new JSONObject(dto).toString());
    String jsonFileName = dto.fileName() + JSON_TYPE;;
    String jsonFilePath = buildFilePathNew(dto.endpoint(), dealerCode, yearPeriod, monthPeriod, AWS_S3_JSON_ROOT_FOLDER);
    uploadFileToS3(jsonFilePath + jsonFileName, flatJson);
}
public void uploadFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
    if (s3client.doesObjectExist(bucketName, fileName)) {
        throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
    }
    s3client.putObject(bucketName, fileName, fileContent);
}

Ответы [ 2 ]

4 голосов
/ 03 апреля 2019

Параметры parallelism определяют, сколько потоков будет использовано ForkJoinPool.Вот почему по умолчанию значение parallelism является доступным количеством ядер ЦП:

Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors())

В вашем случае необходимо проверить наличие файла и загрузить его на S3.Время здесь будет зависеть как минимум от нескольких факторов: процессор, сетевая карта и драйвер, операционная система и другие.Похоже, что время работы сети S3 не ограничено ЦП в вашем случае, так как вы наблюдаете улучшение, создавая больше рабочих потоков моделирования, возможно, сетевой запрос ставится в очередь операционной системой.

Правильное значение для parallelismварьируется от одного типа рабочей нагрузки к другому.Рабочий процесс с привязкой к ЦП лучше, по умолчанию parallelism равен ядру ЦП из-за негативного влияния переключения контекста.Рабочая нагрузка, не связанная с ЦП, такая как ваша, может быть увеличена за счет увеличения числа рабочих потоков, при условии, что рабочая нагрузка не будет блокировать ЦП, например, из-за занятого ожидания .

Не существует единого идеального значения для parallelism в ForkJoinPool.

0 голосов
/ 03 апреля 2019

Мне удалось сократить до 8 секунд благодаря всем вашим полезным советам и объяснениям.

Поскольку узким местом была загрузка на aws s3, и вы упомянули неблокирующий API на aws, после некоторых исследований я обнаружил, что класс TransferManager содержит неблокирующую загрузка.

TransferManager class

Таким образом, вместо того, чтобы использовать ForkJoinPool для увеличения количества потоков, я сохранил простой parallelStream:

        dtos.parallelStream().forEach(dto -> {
            try {
                processDTO(dealerCode, yearPeriod, monthPeriod, dto);
            } catch (FileAlreadyExistsInS3Exception e) {
                failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
            }
        });

И метод uploadToS3 немного изменился, вместо использования AmazonS3 , я использовал TransferManager :

public Upload uploadAsyncFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
        if (s3client.doesObjectExist(bucketName, fileName)) {
            throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
        }
        InputStream targetStream = new ByteArrayInputStream(fileContent.getBytes());
        ObjectMetadata metadata = new ObjectMetadata();
        metadata.setContentLength(fileContent.getBytes().length);
        return transferManager.upload(bucketName, fileName, targetStream, metadata);
}

Таким образом, когда вызывается загрузка, она не ждет, пока она завершится, позволяя обработать другой DTO. Когда все DTO обработаны, я проверяю их статус загрузки, чтобы увидеть возможные ошибки (за исключением первого forEach)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...