Как реализовать параллельную обработку в akka? - PullRequest
0 голосов
/ 12 июня 2018

У меня есть метод, в котором есть несколько обращений к БД.Поскольку я не реализовал никакой параллельной обработки, 2-й вызов БД должен ждать, пока не завершится 1-й вызов БД, 3-й должен дождаться завершения 2-го и т. Д.

Все вызовы БД не зависят от каждогоДругой.Я хочу сделать это таким образом, чтобы все вызовы БД выполнялись одновременно.

Я новичок в инфраструктуре Akka.

Может кто-нибудь помочь мне с небольшим образцом или ссылки помогут.Приложение разработано в Scala Lang.

Ответы [ 2 ]

0 голосов
/ 03 января 2019

Если вы хотите запросить базу данных, вы должны использовать что-то вроде slick , которая является современной библиотекой запросов и доступа к базе данных для Scala.

быстрый пример slick:

case class User(id: Option[Int], first: String, last: String)

class Users(tag: Tag) extends Table[User](tag, "users") {
  def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
  def first = column[String]("first")
  def last = column[String]("last")
  def * = (id.?, first, last) <> (User.tupled, User.unapply)
}
val users = TableQuery[Users]

тогда вам нужно создать конфигурацию для вашей базы данных:

mydb = {
  dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
  properties = {
    databaseName = "mydb"
    user = "myuser"
    password = "secret"
  }
  numThreads = 10
}

, и в вашем коде вы загружаете конфигурацию:

val db = Database.forConfig("mydb")

, затем выполните свой запрос с помощью метода db.run, которыйдает вам будущее как результат, например, вы можете получить все строки, вызвав метод result

val allRows: Future[Seq[User]] = db.run(users.result)

этот запрос выполняется без блокировки текущего потока.

Если у вас есть задача, выполнение которой занимает много времениили для вызова другой службы вы должны использовать фьючерсы.

Примером этого является простой HTTP-вызов внешней службы.Вы можете найти пример в здесь

Если у вас есть задача, выполнение которой занимает много времени и для этого, вы должны сохранять изменяемые состояния, в этом случае лучшим вариантом будет использование Akka Actorsкоторые инкапсулируют ваше состояние внутри субъекта, который решает проблему параллелизма и безопасности потоков настолько просто, насколько это возможно. Примеры задач suck:

import akka.actor.Actor

import scala.concurrent.Future

case class RegisterEndpoint(endpoint: String)

case class NewUpdate(update: String)

class UpdateConsumer extends Actor {
  val endpoints = scala.collection.mutable.Set.empty[String]

  override def receive: Receive = {

    case RegisterEndpoint(endpoint) =>
      endpoints += endpoint

    case NewUpdate(update) =>
      endpoints.foreach { endpoint =>
        deliverUpdate(endpoint, update)
      }
  }

  def deliverUpdate(endpoint: String, update: String): Future[Unit] = {
    Future.successful(Unit)
  }

}

Если вы хотите обрабатывать огромное количество живых данных или соединение через веб-сокет,обработка файла CSV, который растет со временем, ... или т. д., лучший вариант - поток Akka.Например, чтение данных из темы кафки с помощью Alpakka: Alpakka kafka разъем

0 голосов
/ 13 июня 2018

Существует три основных способа достижения параллелизма для нужд данного примера.

Фьючерсы

Для конкретного варианта использования, который задается в вопросеЯ бы порекомендовал Futures перед любой конструкцией akka.

Предположим, нам даны вызовы базы данных в виде функций:

type Data = ???

val dbcall1 : () => Data = ???

val dbcall2 : () => Data = ???

val dbcall3 : () => Data = ???

Параллельность может быть легко применена, а затем результаты могут быть собраны, используя Futures:

val f1 = Future { dbcall1() }
val f2 = Future { dbcall2() }
val f3 = Future { dbcall3() }

for {
  v1 <- f1
  v2 <- f2
  v3 <- f3
} {
  println(s"All data collected: ${v1}, ${v2}, ${v3}")
}

Akka Streams

Существует аналогичный ответ стека , который демонстрирует, как использовать библиотеку akka-stream для одновременных запросов в БД.

Akka Actors

Также можно написать Actor для выполнения запроса:

object MakeQuery

class DBActor(dbCall : () => Data) extends Actor {
  override def receive = {
    case _ : MakeQuery => sender ! dbCall()
  }
}

val dbcall1ActorRef = system.actorOf(Props(classOf[DBActor], dbcall1)) 

Однако, в этом случае использования Актеры менее полезны, потому что вы по-прежнемунеобходимо собрать все данные вместе.

Вы можете использовать ту же технику, что и в разделе «Фьючерсы»:

val f1 : Future[Data] = (dbcall1ActorRef ? MakeQuery).mapTo[Data]

for {
  v1 <- f1
  ...

Или вам придется соединять актеров вручнуючерез конструктор и обработайте всю логику обратного вызова для ожидания на другом акторе:

class WaitingDBActor(dbCall : () => Data, previousActor : ActorRef) {
  override def receive = {
    case _ : MakeQuery => previousActor forward MakeQuery

    case previousData : Data => sender ! (dbCall(), previousData)
  }
}
...