Я пишу в распределенную базу данных в памяти, размер пакета которой определяется пользователем в многопоточной среде.Но я хочу ограничить количество строк, записанных в ex.1000 строк / секПричина этого требования заключается в том, что мой производитель пишет слишком быстро, а потребитель сталкивается с ошибкой конечной памяти.Существует ли какая-либо стандартная практика выполнения регулирования при пакетной обработке записей.
dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
val dbRecords = recordSet.map(m => (m, Events.transform(m)))
dbRecords.map { record =>
try {
Events.setValues(eventInsert, record._2)
eventInsert.addBatch
} catch {
case e: Exception =>
logger.error(s"error adding batch: ${e.getMessage}")
val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
logger.error(s"event: $error_event")
}
}
// Bulk Commit Records
try {
eventInsert.executeBatch
} catch {
case e: java.sql.BatchUpdateException =>
val updates = e.getUpdateCounts
logger.error(s"failed commit: ${updates.toString}")
updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
logger.error(s"insert error: $error")
logger.error(e.getMessage)
}
}
finally {
connection.commit
eventInsert.clearBatch
logger.debug(s"committed: ${dbRecords.length.toString}")
}
}
Я надеялся, что смогу передать заданные пользователем аргументы как throttleMax, и если общее количество записей, записанных каждым потоком, достигнет throttleMax, thread.sleep () будет вызываться в течение 1 секунды.Но это сделает весь процесс очень медленным.Может ли быть какой-либо другой эффективный метод, который можно использовать для ускорения загрузки данных до 1000 строк / сек?