Здесь мы разработали несколько сервисов, каждый из которых использует акторов akka, а связь между сервисами осуществляется через GRPC Akka. Существует один сервис, который заполняет базу данных в памяти, а другой сервис, называемый Reader, применяет некоторые данные запроса и формы, а затем передает их в Службу эластичного поиска для вставки / обновления. Объем данных на каждом этапе чтения составляет около 1 млн строк.
Проблема возникает, когда Reader передает большой объем данных, так чтоasticsearch не может обработать их и вставить / обновить их все.
Я использовал метод потока akka для связи этих двух служб. Я также использую scalike jdbc lib и код, как показано ниже, для чтения и вставки пакетных данных вместо целых.
def applyQuery(query: String,mergeResult:Map[String, Any] => Unit) = {
val publisher = DB readOnlyStream {
SQL(s"${query}").map(_.toMap()).list().fetchSize(100000)
.iterator()
}
Source.fromPublisher(publisher).runForeach(mergeResult)
}
////////////////////////////////////////////////////////
var batchRows: ListBuffer[Map[String, Any]] = new ListBuffer[Map[String, Any]]
val batchSize: Int = 100000
def mergeResult(row:Map[String, Any]):Unit = {
batchRows :+= row
if (batchRows.size == batchSize) {
send2StorageServer(readyOutput(batchRows))
batchRows.clear()
}
}
def readyOutput(res: ListBuffer[Map[String, Any]]):ListBuffer[StorageServerRequest] = {
// code to format res
}
Теперь, когда используется команда 'foreach', она значительно замедляет операции. Я пробовал другой размер партии, но это не имело смысла. Я ошибаюсь при использовании команды foreach
или есть лучший способ решить проблему скорости, используя поток akka, поток и т. Д.