BatchWrite в DynamoDB с использованием службы Executor - PullRequest
0 голосов
/ 19 июня 2020

У меня есть служба emr, которая выполняет следующие операции:

Загрузка данных из S3 Выполнение преобразования, проверки работоспособности, проверки Загрузить данные в таблицу DDb Служба загружает в таблицу около 5 миллионов записей. Чтобы сократить время загрузки, я использовал службу исполнителя для выполнения пакетной записи из нескольких параллельных потоков.

Я заметил, что я не получаю никакого выигрыша, если у меня более 5 потоков в службе-исполнителе. Я был бы признателен, если бы кто-то, кто сделал что-то подобное, поделился бы своим мнением относительно максимизации пропускной способности DynamoDB. Наша подготовленная пропускная способность действительно высока (10 КБ записи), но я не могу превысить 1000 операций записи / с даже после распараллеливания.

Ключ раздела: employeeId и каждая строка будет иметь около 200 КБ данных Вот пример службы исполнителя:

val BATCH_THRESHOLD = 25

     private def saveRecords(employeeJobDataList: List[AnytimePayEmployeeJobData]): 
 List[FailedBatch] = {
val upsertsFailedBatches: List[FailedBatch] = new ArrayList[FailedBatch]
val executor = ExecutionContext.
fromExecutorService(Executors.newFixedThreadPool(20, new ThreadFactoryBuilder().build()))
var i = 0
while (i < employeeJobDataList.size()){

val startIndex = i
var endIndex = i + BATCH_THRESHOLD

if(startIndex + BATCH_THRESHOLD >= employeeJobDataList.size()){
endIndex = employeeJobDataList.size()
}

executor.submit(new Runnable {
override def run(): Unit = {
  try{
   upsertsFailedBatches.addAll(DDBOperations.batchSaveInDDB(employeeJobDataList.subList(startIndex, endIndex)))
  } catch {
    case e: Throwable => {
      log.error("Executor Service failed in inserting data into DynamoDb Table")
      throw e
    }
  }
}
})
 i = i + BATCH_THRESHOLD}

executor.shutdown()
while (!executor.isTerminated()) {   }
 }
...