Как получить оптимальную частоту вставки в DynamoDb через Executor Framework в Java? - PullRequest
2 голосов
/ 28 июня 2019

Я делаю POC при массовой записи (около 5,5 тыс. Элементов) в локальной базе данных Dynamo, используя DynamoDB SDK для Java. Я знаю, что каждая массовая запись не может иметь более 25 операций записи, поэтому я делю весь набор данных на куски по 25 элементов в каждой. Затем я передаю эти куски как вызываемые действия в среде Executor. Тем не менее, я не получаю удовлетворительного результата, так как записи 5.5k вставляются более чем за 100 секунд.

Я не уверен, как еще я могу оптимизировать это. При создании таблицы я выделил WriteCapacityUnit как 400 (не уверен, какое максимальное значение я могу дать) и немного поэкспериментировал с ним, но это никогда не имело никакого значения. Я также попытался изменить количество потоков в исполнителе.

Это основной код для выполнения операции массовой записи:


    public static void main(String[] args) throws Exception {

        AmazonDynamoDBClient client = new AmazonDynamoDBClient().withEndpoint("http://localhost:8000");

        final AmazonDynamoDB aws = new AmazonDynamoDBClient(new BasicAWSCredentials("x", "y"));
        aws.setEndpoint("http://localhost:8000");

        JSONArray employees = readFromFile();
        Iterator<JSONObject> iterator = employees.iterator();

        List<WriteRequest> batchList = new ArrayList<WriteRequest>();

        ExecutorService service = Executors.newFixedThreadPool(20);

        List<BatchWriteItemRequest> listOfBatchItemsRequest = new ArrayList<>();
        while(iterator.hasNext()) {
            if (batchList.size() == 25) {
                Map<String, List<WriteRequest>> batchTableRequests = new HashMap<String, List<WriteRequest>>();
                batchTableRequests.put("Employee", batchList);
                BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest();
                batchWriteItemRequest.setRequestItems(batchTableRequests);
                listOfBatchItemsRequest.add(batchWriteItemRequest);
                batchList = new ArrayList<WriteRequest>();
            }
            PutRequest putRequest = new PutRequest();
            putRequest.setItem(ItemUtils.fromSimpleMap((Map) iterator.next()));
            WriteRequest writeRequest = new WriteRequest();
            writeRequest.setPutRequest(putRequest);
            batchList.add(writeRequest);
        }

        StopWatch watch = new StopWatch();
        watch.start();

        List<Future<BatchWriteItemResult>> futureListOfResults = listOfBatchItemsRequest.stream().
                map(batchItemsRequest -> service.submit(() -> aws.batchWriteItem(batchItemsRequest))).collect(Collectors.toList());

        service.shutdown();

        while(!service.isTerminated());

        watch.stop();
        System.out.println("Total time taken : " + watch.getTotalTimeSeconds());

    }

}

Этот код используется для создания таблицы DynamoDB:

    public static void main(String[] args) throws Exception {
        AmazonDynamoDBClient client = new AmazonDynamoDBClient().withEndpoint("http://localhost:8000");

        DynamoDB dynamoDB = new DynamoDB(client);
        String tableName = "Employee";
        try {
            System.out.println("Creating the table, wait...");
            Table table = dynamoDB.createTable(tableName, Arrays.asList(new KeySchemaElement("ID", KeyType.HASH)

            ), Arrays.asList(new AttributeDefinition("ID", ScalarAttributeType.S)),
                    new ProvisionedThroughput(1000L, 1000L));
            table.waitForActive();
            System.out.println("Table created successfully.  Status: " + table.getDescription().getTableStatus());

        } catch (Exception e) {
            System.err.println("Cannot create the table: ");
            System.err.println(e.getMessage());
        }
    }
...