У меня есть служба emr, которая выполняет следующие операции:
Загрузка данных из S3 Выполнение преобразования, проверки работоспособности, проверки Загрузить данные в таблицу DDb Служба загружает в таблицу около 5 миллионов записей. Чтобы сократить время загрузки, я использовал службу исполнителя для выполнения пакетной записи из нескольких параллельных потоков.
Я заметил, что я не получаю никакого выигрыша, если у меня более 5 потоков в службе-исполнителе. Я был бы признателен, если бы кто-то, кто сделал что-то подобное, поделился бы своим мнением относительно максимизации пропускной способности DynamoDB. Наша подготовленная пропускная способность действительно высока (10 КБ записи), но я не могу превысить 1000 операций записи / с даже после распараллеливания.
Ключ раздела: employeeId и каждая строка будет иметь около 200 КБ данных Вот пример службы исполнителя:
val BATCH_THRESHOLD = 25
private def saveRecords(employeeJobDataList: List[AnytimePayEmployeeJobData]):
List[FailedBatch] = {
val upsertsFailedBatches: List[FailedBatch] = new ArrayList[FailedBatch]
val executor = ExecutionContext.
fromExecutorService(Executors.newFixedThreadPool(20, new ThreadFactoryBuilder().build()))
var i = 0
while (i < employeeJobDataList.size()){
val startIndex = i
var endIndex = i + BATCH_THRESHOLD
if(startIndex + BATCH_THRESHOLD >= employeeJobDataList.size()){
endIndex = employeeJobDataList.size()
}
executor.submit(new Runnable {
override def run(): Unit = {
try{
upsertsFailedBatches.addAll(DDBOperations.batchSaveInDDB(employeeJobDataList.subList(startIndex, endIndex)))
} catch {
case e: Throwable => {
log.error("Executor Service failed in inserting data into DynamoDb Table")
throw e
}
}
}
})
i = i + BATCH_THRESHOLD}
executor.shutdown()
while (!executor.isTerminated()) { }
}