Можно ли построить HTTP-сервер OLTP / CRUD, используя AkkaHttp, AkkaStreams, Alpakka и базу данных? - PullRequest
1 голос
/ 10 февраля 2020

Для меня очевидно, что использование актеров, конечно, возможно: например, https://github.com/chbatey/akka-http-typed.git использует AkkaHttp и набирает актеров.

Но мне непонятно, если просто использовать AkkaStreams и его библиотека коннекторов Alpakka (которая включает в себя базы данных), если это возможно, для выполнения обычных служб CRUD / OLTP, или просто репликации данных из одной базы данных в другую, или другой сценарий OLAP / пакетной / потоковой обработки ios.

Если вы знаете, как это можно сделать, укажите, пожалуйста, несколько деталей, и если вы можете привести пример на github, например, это было бы здорово.

Я думаю, что это возможно, так как Сервер участвует в двух диалогах / преобразовании потока с состоянием: один с внешним миром по HTTP, а другой с базой данных. Я не уверен, что это можно смоделировать таким образом.

https://doc.akka.io/docs/alpakka/current/slick.html, кажется, предлагает как UPDATE / INSERTS в качестве Sink, так и указал SELECT на определенный идентификатор в качестве Источник. Знаете ли вы, есть ли пример приложения, или вы можете широко упомянуть, как проводка будет происходить с Akka Http?

Ответы [ 2 ]

2 голосов
/ 10 февраля 2020

Я разместил здесь демонстрацию, надеюсь, она вам поможет.

Создание таблицы, базы данных: mysql.

CREATE TABLE test(id VARCHAR(32))

sbt:

"com.lightbend.akka"                        %% "akka-stream-alpakka-slick"     % "1.1.0",
"mysql"                                      % "mysql-connector-java"          % "5.1.40"

Код:

package tech.parasol.scala.crud

import java.sql.SQLException

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{complete, get, path, _}
import akka.stream.alpakka.slick.scaladsl.{Slick, SlickSession}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.io.StdIn
import scala.util.{Failure, Success}

object CrudTest1 {


  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem("CrudTest1")
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher




    val hostName = "120.0.0.1"
    val rocketDbConfig =
      s"""
         |db-config {
         |  profile = "slick.jdbc.MySQLProfile$$"
         |  db {
         |    dataSourceClass = "slick.jdbc.DriverDataSource"
         |    properties = {
         |      driver = "com.mysql.jdbc.Driver"
         |      url = "jdbc:mysql://${hostName}:3306/rocket?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&useSSL=false"
         |      user = "root"
         |      password = "passw0rd"
         |    }
         |  }
         |}
         |
     """.stripMargin


    implicit val session = SlickSession.forConfig("db-config", ConfigFactory.parseString(rocketDbConfig))


    import session.profile.api._

    def persistence(message: String) = {
      def insert(message: String): DBIO[Int] = {
        sqlu"""INSERT INTO test(id) VALUES (${message})"""
      }

      session.db.run(insert(message)).map {
        case _ => message
      }.recover {
        case e : SQLException => {
          throw new Exception("Database error ===>")}
        case e : Exception => {
          throw new Exception("Database error.")}
      }

    }


    val route = path("hello" / Segment ) { name =>
        get {
          val res = persistence(name)
          onComplete(res) {
            case Success(value) => {
              complete(s"<h1>Say hello to ${name}</h1>")
            }
            case Failure(e) => {
              complete(s"<h1>Failed to say hello to ${name}</h1>")
            }
          }
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8088)

    println(s"Server online at http://localhost:8088/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}


0 голосов
/ 10 февраля 2020

Да, в основном при каждом получении запроса в AkkaHttp, мы создаем граф AkkaStreams (обычно просто конвейер), в основном просто Slick Alpakka Source из базы данных, может быть с префиксом некоторых операторов, а затем возвращаем в AkkaHttp, который, конечно, поддерживает источник. Более подробная информация на [https://www.quora.com/Is-it-possible-to-build-an-OLTP-CRUD-HTTP-server-using-Akka-HTTP-Akka-Streams-Alpakka-and-a-database-Do-you-know-any-examples-of-code-on-GitHub-or-elsewhere/answer/Nicolae-Marasoiu]

...