Синхронизация скорости чтения из БД и записи вasticsearch с использованием потока Akka grpc - PullRequest
0 голосов
/ 10 января 2019

Здесь мы разработали несколько сервисов, каждый из которых использует акторов akka, а связь между сервисами осуществляется через GRPC Akka. Существует один сервис, который заполняет базу данных в памяти, а другой сервис, называемый Reader, применяет некоторые данные запроса и формы, а затем передает их в Службу эластичного поиска для вставки / обновления. Объем данных на каждом этапе чтения составляет около 1 млн строк. Проблема возникает, когда Reader передает большой объем данных, так чтоasticsearch не может обработать их и вставить / обновить их все.

Я использовал метод потока akka для связи этих двух служб. Я также использую scalike jdbc lib и код, как показано ниже, для чтения и вставки пакетных данных вместо целых.

def applyQuery(query: String,mergeResult:Map[String, Any] => Unit) = {
  val publisher = DB readOnlyStream {
    SQL(s"${query}").map(_.toMap()).list().fetchSize(100000)
      .iterator()
  }

  Source.fromPublisher(publisher).runForeach(mergeResult)
}
////////////////////////////////////////////////////////
var batchRows: ListBuffer[Map[String, Any]] = new ListBuffer[Map[String, Any]]
val batchSize: Int = 100000
def mergeResult(row:Map[String, Any]):Unit = {
    batchRows :+= row

    if (batchRows.size == batchSize) {

      send2StorageServer(readyOutput(batchRows))
      batchRows.clear()
    }
  }

  def readyOutput(res: ListBuffer[Map[String, Any]]):ListBuffer[StorageServerRequest] = {

// code to format res  
  }

Теперь, когда используется команда 'foreach', она значительно замедляет операции. Я пробовал другой размер партии, но это не имело смысла. Я ошибаюсь при использовании команды foreach или есть лучший способ решить проблему скорости, используя поток akka, поток и т. Д.

1 Ответ

0 голосов
/ 12 января 2019

Я обнаружил, что операция для добавления к ListBuffer равна

batchRows + = строка

, но использование :+ не приводит к ошибке, но очень неэффективно, поэтому при использовании правильного оператора foreach больше не замедляется, хотя проблема со скоростью снова возникает. На этот раз чтение данных происходит быстро, но запись вasticsearch идет медленно.

После некоторых поисков я нашел следующие решения: 1. Может помочь использование очереди в качестве буфера между базой данных иasticsearch. 2. Также, если блокировка операции чтения до тех пор, пока запись не будет выполнена, не является дорогостоящей, это может быть другое решение.

...