Простой серверный push-поток с Akka - PullRequest
0 голосов
/ 14 февраля 2019

Я изо всех сил пытаюсь реализовать - довольно простой - поток Akka.Вот что мне нужно:

akka-flow

У меня есть один сервер и n клиентов, и я хочу иметь возможность реагировать на внешние события, передавая сообщения(JSON) для клиентов.Клиенты могут зарегистрироваться / отменить регистрацию в любое время.

Так, например:

  • 1 клиент зарегистрирован
  • сервер генерирует событие («Hello World!»)
  • сервер транслирует «Hello World!»всем клиентам (один клиент)
  • новый клиент открывает соединение через веб-сокет
  • сервер генерирует другое событие («Hello Akka!»)
  • сервер передает «Hello Akka!"всем клиентам (двум клиентам)

Вот что у меня есть:

def route: Route = {
   val register = path("register") {
     // registration point for the clients
     handleWebSocketMessages(serverPushFlow)
   }
}

// ...

def broadcast(msg: String): Unit = {
  // use the previously created flow to send messages to all clients
  // ???
}

// my broadcast sink to send messages to the clients
val broadcastSink: Sink[String, Source[String, NotUsed]] = BroadcastHub.sink[String]

// a source that emmits simple strings
val simpleMsgSource = Source(Nil: List[String])

def serverPushFlow = {
  Flow[Message].mapAsync(1) {
    case TextMessage.Strict(text) =>       Future.successful(text)
    case streamed: TextMessage.Streamed => streamed.textStream.runFold("")(_ ++ _)
  }
  .via(Flow.fromSinkAndSource(broadcastSink, simpleMsgSource))
  .map[Message](string => TextMessage(string))
}

1 Ответ

0 голосов
/ 14 февраля 2019

Чтобы иметь возможность использовать broadcastHub, вы должны определить два потока.Тот, который запускает ваш веб-сокет TextMessage на broadcastHub.Вы должны запустить его, он создает источник, который вы подключаете к каждому клиенту.

Вот эта концепция, описанная в простом исполняемом приложении.

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BroadcastHub, Sink, Source}
import org.slf4j.LoggerFactory

import scala.concurrent.duration._

object BroadcastSink extends App {

  private val logger = LoggerFactory.getLogger("logger")

  implicit val actorSystem = ActorSystem()
  implicit val actorMaterializer = ActorMaterializer()

  val broadcastSink: Sink[String, Source[String, NotUsed]] =
    BroadcastHub.sink[String]

  val simpleMsgSource = Source.tick(500.milli, 500.milli, "Single Message")

  val sourceForClients: Source[String, NotUsed] = simpleMsgSource.runWith(broadcastSink)

  sourceForClients.to(Sink.foreach(t => logger.info(s"Client 1: $t"))).run()
  Thread.sleep(1000)

  sourceForClients.to(Sink.foreach(t => logger.info(s"Client 2: $t"))).run()
  Thread.sleep(1000)

  sourceForClients.to(Sink.foreach(t => logger.info(s"Client 3: $t"))).run()
  Thread.sleep(1000)

  sourceForClients.to(Sink.foreach(t => logger.info(s"Client 4: $t"))).run()
  Thread.sleep(1000)

  actorSystem.terminate()
}

Отпечатки

10:52:01.774 Client 1: Single Message
10:52:02.273 Client 1: Single Message
10:52:02.273 Client 2: Single Message
10:52:02.773 Client 2: Single Message
10:52:02.773 Client 1: Single Message
10:52:03.272 Client 3: Single Message
10:52:03.272 Client 2: Single Message
10:52:03.272 Client 1: Single Message
10:52:03.772 Client 1: Single Message
10:52:03.772 Client 3: Single Message
10:52:03.773 Client 2: Single Message
10:52:04.272 Client 2: Single Message
10:52:04.272 Client 4: Single Message
10:52:04.272 Client 1: Single Message
10:52:04.273 Client 3: Single Message
10:52:04.772 Client 1: Single Message
10:52:04.772 Client 2: Single Message
10:52:04.772 Client 3: Single Message
10:52:04.772 Client 4: Single Message
10:52:05.271 Client 4: Single Message
10:52:05.271 Client 1: Single Message
10:52:05.271 Client 3: Single Message
10:52:05.272 Client 2: Single Message

Если клиенты известны заранее, вам не нужен BrodacastHub и вы можете использовать метод alsoTo:

  def webSocketHandler(clients: List[Sink[Message, NotUsed]]): Flow[Message, Message, Any] = {
    val flow = Flow[Message]
    clients.foldLeft(flow) {case (fl, client) =>
      fl.alsoTo(client)
    }
  }
...