Чтобы иметь возможность использовать broadcastHub, вы должны определить два потока.Тот, который запускает ваш веб-сокет TextMessage
на broadcastHub
.Вы должны запустить его, он создает источник, который вы подключаете к каждому клиенту.
Вот эта концепция, описанная в простом исполняемом приложении.
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BroadcastHub, Sink, Source}
import org.slf4j.LoggerFactory
import scala.concurrent.duration._
object BroadcastSink extends App {
private val logger = LoggerFactory.getLogger("logger")
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val broadcastSink: Sink[String, Source[String, NotUsed]] =
BroadcastHub.sink[String]
val simpleMsgSource = Source.tick(500.milli, 500.milli, "Single Message")
val sourceForClients: Source[String, NotUsed] = simpleMsgSource.runWith(broadcastSink)
sourceForClients.to(Sink.foreach(t => logger.info(s"Client 1: $t"))).run()
Thread.sleep(1000)
sourceForClients.to(Sink.foreach(t => logger.info(s"Client 2: $t"))).run()
Thread.sleep(1000)
sourceForClients.to(Sink.foreach(t => logger.info(s"Client 3: $t"))).run()
Thread.sleep(1000)
sourceForClients.to(Sink.foreach(t => logger.info(s"Client 4: $t"))).run()
Thread.sleep(1000)
actorSystem.terminate()
}
Отпечатки
10:52:01.774 Client 1: Single Message
10:52:02.273 Client 1: Single Message
10:52:02.273 Client 2: Single Message
10:52:02.773 Client 2: Single Message
10:52:02.773 Client 1: Single Message
10:52:03.272 Client 3: Single Message
10:52:03.272 Client 2: Single Message
10:52:03.272 Client 1: Single Message
10:52:03.772 Client 1: Single Message
10:52:03.772 Client 3: Single Message
10:52:03.773 Client 2: Single Message
10:52:04.272 Client 2: Single Message
10:52:04.272 Client 4: Single Message
10:52:04.272 Client 1: Single Message
10:52:04.273 Client 3: Single Message
10:52:04.772 Client 1: Single Message
10:52:04.772 Client 2: Single Message
10:52:04.772 Client 3: Single Message
10:52:04.772 Client 4: Single Message
10:52:05.271 Client 4: Single Message
10:52:05.271 Client 1: Single Message
10:52:05.271 Client 3: Single Message
10:52:05.272 Client 2: Single Message
Если клиенты известны заранее, вам не нужен BrodacastHub и вы можете использовать метод alsoTo
:
def webSocketHandler(clients: List[Sink[Message, NotUsed]]): Flow[Message, Message, Any] = {
val flow = Flow[Message]
clients.foldLeft(flow) {case (fl, client) =>
fl.alsoTo(client)
}
}