Моя loadRaw()
функция читает S3Stream
и вставляет данные в таблицу orderItems
.Чтобы проверить метод, я скачал файлы S3, содержащие необработанные данные за 1 день.Данные загружаются, но очень неэффективно, и между ними waiting for child threads
.Что я делаю не так в этом коде?-
def loadRaw(file: String, fileHost: String, batchSize: Int, bucket: String): Unit = {
val dataStream = fileHost match {
case "s3" => S3Stream(s3Client, bucket, file)
case "local" => LazyFileStream(file)
}
route match {
case 1 => {
dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
val dbRecords = recordSet.filter(f => OrderEvent.isValidCreateOrder(f)).map(OrderEvent.orderItemRowsFromCreateOrder(_).flatten.grouped(batchSize))
try {
dbRecords.foreach { batchedList =>
batchedList.foreach { record =>
try {
OrderEvent.setColumnValues(orderItemsInsert, record.toMap)
orderItemsInsert.addBatch()
} catch {
case e: Exception =>
logger.error(s"Error parsing order items. \n - Exception: ${e.getMessage}\n Event: ${eventToString(record.toMap)}")
ArgosLogger.sendError(ErrorType.PARSING_ERROR, Some(record.toMap), Some(e))
}
}
orderItemsInsert.executeBatch()
orderItemsInsert.clearBatch()
}
connection.commit
} catch {
case e: Exception =>
Try(connection.rollback())
throw e;
}
/* finally {
connection.commit
orderItemsInsert.clearBatch
logger.debug(s"committed: ${dbRecords.length.toString}")
}*/
}
}
case _ => {}
Здесь orderItemInsert
- это уже подготовленный оператор SQL, который выполняется после того, как данные заданы с использованием setColumnValues()
.
case "s3load" =>
val files = if (Cli.s3loader.loadManifest.isEmpty) Cli.s3loader.loadFiles.split(",").toList
else scala.io.Source.fromFile(Cli.s3loader.loadManifest).getLines.toList
val fileQueue = new java.util.concurrent.ConcurrentLinkedQueue[String](files)
val pool = Executors.newFixedThreadPool(Cli.s3loader.parallelism)
1 to Cli.s3loader.parallelism map {
index =>
pool.submit(MemSQLS3Loader(
Cli.s3loader.memsqlHost,
Cli.s3loader.targetTable,
Cli.s3loader.memsqlUserName,
Cli.s3loader.memsqlPassword,
Cli.s3loader.memsqlBatchSize,
Cli.s3loader.bucket,
Cli.s3loader.route,
fileQueue)
)
}
while (!fileQueue.isEmpty) {
logger.info("waiting for child threads")
Thread.sleep(5000L)
}
logger.info("starting pool termination")
pool.shutdown()
try {
pool.awaitTermination(60L, TimeUnit.SECONDS)
logger.info("pool terminated")
} catch {
case e: Throwable =>
throw (e)
}
Консоль для большей наглядности -