реализовать таймер в акке - PullRequest
1 голос
/ 10 июля 2019

Здравствуйте. Я пытаюсь реализовать SQL-предложение в Scala AKKA MySql, которое подключается к базе данных MySQL с помощью Slick.source, и я хотел бы выполнить запрос в таймере с определенным периодом времени.Я хотел бы получить информацию из базы данных за 10 секунд, я не нашел никакой информации о том, как это сделать.У меня вопрос: есть ли какая-либо опция в Slick для реализации таймера с предложением SQL.Есть идеи ??

class AKKAMYSQL {
  def run(connectionMysql: Config,mqttUrlSub:String,mqttUserSub:String,mqttPasswordSub:String,mqttTopicSub:String): Unit = {
print(" AKKAMYSQL ")
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

val configFile = new File("application.conf")
val fileConfig = ConfigFactory.parseFile(configFile)
val config = ConfigFactory.load(fileConfig)

val sinkSettingsInflux = MqttConnectionSettings(
  mqttUrlSub,
  "",
  new MemoryPersistence
).withAuth(mqttUserSub, mqttPasswordSub)

implicit val session = SlickSession.forConfig(connectionMysql)
system.registerOnTermination(session.close())
case class User(plant: String,source: String,value:String,timeStamp:String,tipo: String)
implicit val getUserResult = GetResult(r => User(r.nextString(), r.nextString(),r.nextString(),r.nextString(),r.nextString()))
import session.profile.api._
val done: Future[Done] =
  Slick.source(sql"SELECT plantnodes.xid as planta,profilesources.xid as profileSource,profilevalues.value as value,profilevalues.ts as timeStamp,profiletypes.xid as tipo FROM profileheaders  INNER JOIN profilevalues ON profilevalues.profileHeaderId = profileheaders.id INNER JOIN plantnodes ON plantnodes.id = profileheaders.plantNodeId INNER JOIN profilesources ON profilesources.id = profileheaders.profileSourceId INNER JOIN profiletypes ON profiletypes.id = profileheaders.profileTypeId WHERE profiletypes.xid = 'RTV#FR#00'  OR profiletypes.xid = 'RTV#CAVG#00' OR profiletypes.xid = 'RTV#AP#00' OR profiletypes.xid = 'RTV#RP#00' OR 'RTV#SP#00' OR 'RTV#PF#00' OR 'RTV#VAVG#00' OR 'RTV#SOC#00' OR 'RTV#TH#00' LIMIT 3".as[User])
  .via(Flow[User].map(f = u => {
    val jsonString =
      """
{
}
    val json: JsValue = Json.parse(jsonString)
    new MqttMessage(topic =mqttTopicSub,ByteString.apply(Json.stringify(res)))
  }))
    .log("user")
    .runWith(Sink.ignore)

done.onComplete {end => {
  if (end.isFailure) {
    println("Error  " + end.get.toString())
    System.exit(1)
  }
  else
    println("Finished")
}

}


}
}

1 Ответ

2 голосов
/ 10 июля 2019

Я не думаю, что slick имеет ту функциональность, которую вы ищете. Однако вы можете использовать другие конструкции akka для достижения той же цели.

Сначала создайте Actor, который выполняет ваши SQL-запросы:

object Tick

class TickActor(akkamysql : AKKAMYSQL) extends Actor {
  override def receive : Receive = {
    case Tick => akkamysql.run()
  }
}

Затем используйте этот Actor вместе с Scheduler:

ActorSystem.
val akkamysql : AKKAMYSQL = ???

val actorSystem : ActorSystem = ???

val tickActorRef = actorSystem.actorOf(Props(classOf[TickActor], akkamysql))

import system.dispatcher

val cancellable = 
  system.scheduler.schedule(0 milliseconds, 10000 milliseconds, tickActor, Tick)
...