Akka Streams WebSocket для отправки информации о произвольных событиях - PullRequest
0 голосов
/ 13 мая 2018

Я хочу реализовать сервис, где несколько клиентов могут подключаться к серверу с помощью WebSocket. Сервер должен иметь возможность транслировать сообщения всем подключенным клиентам о произвольных внутренних событиях. Пока у меня есть этот код:

import akka.http.scaladsl.server.RouteResult.route2HandlerFlow
import akka.http.scaladsl.server.Directives._
implicit val system = ActorSystem("Server")
implicit val mat = ActorMaterializer()

// The source to broadcast (just ints for simplicity)
val dataSource = Source(1 to 1000).throttle(1, 1.second, 1, ThrottleMode.Shaping).map(_.toString)

// Go via BroadcastHub to allow multiple clients to connect
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
  dataSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)

val producer: Source[String, NotUsed] = runnableGraph.run()

// Optional - add sink to avoid backpressuring the original flow when no clients are attached
producer.runWith(Sink.ignore)

val wsHandler: Flow[Message, Message, NotUsed] =
  Flow[Message]
    .mapConcat(_ => Nil) // Ignore any data sent from the client
    .merge(producer)  // Stream the data we want to the client
    .map(l => TextMessage(l.toString))

val route =
  path("ws") {
    handleWebSocketMessages(wsHandler)
  }

val port = 8080

println("Starting up route")
Http().bindAndHandle(route2HandlerFlow(route), "127.0.0.1", port)
println(s"Started HTTP server on port $port")

Успешно транслирует текущие тики на подключенных клиентов. Как мне изменить этот код, чтобы он мог также передавать произвольные сообщения, а не только запланированные тики?

Разъяснение

Под «произвольными сообщениями» я имею в виду не другие источники, такие как файл или базу данных, а скорее возможность отправить сообщение специализированному Source и передать его текущим подключенным клиентам. Такое сообщение может быть результатом какого-то внутреннего системного события, которое может произойти в любое время.

Ответы [ 2 ]

0 голосов
/ 14 мая 2018

Одной из идей является использование Source.actorRef:

val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
  .toMat(BroadcastHub.sink[String])(Keep.both)
  .run()

val wsHandler: Flow[Message, Message, NotUsed] = Flow[Message]
  .mapConcat(_ => Nil)
  .merge(source)
  .map(l => TextMessage(l.toString))

Сообщения, отправленные материализованному ActorRef, отправляются, если есть потребность в нисходящем потоке. Если нет потребности в нисходящем направлении, элементы буферизуются, и при переполнении буфера используется предоставленная стратегия переполнения. Обратите внимание, что при таком подходе обратное давление отсутствует. Вы можете отправлять сообщения от Source, а также произвольные сообщения этому субъекту:

Source(1 to 1000)
  .throttle(1, 1.second, 1, ThrottleMode.Shaping)
  .map(_.toString)
  .runForeach(msg => actor ! msg)

actor ! "bacon"
actor ! "ribeye"
0 голосов
/ 14 мая 2018

Все, что вам нужно сделать, это изменить источник данных.

Извлечение данных из файла CSV:

val dataSource = FileIO.fromPath(Paths.get("file.csv"))
  .via(Framing.delimiter(ByteString("\n"), 256, true)
  .map(_.utf8String))

Выборка данных из SQS (Alpakka):

val dataSource = SqsSource(queue, sqsSourceSettings).take(100).map(_.getBody)

Выборка данных из таблицы с помощью Slick (Alpakka):

val dataSource = Slick.source(sql"SELECT NAME FROM USERS".as[String])

В основном вам нужно понять три вещи:

  • Источник: один выход
  • Поток: один вход, один выход
  • Раковина: один вход.

Зная это, вы можете строить линейные конвейеры, как:

source.via(flow1).via(flow2).runWith(sink)

Таким образом, вы можете легко «подключить» источники к существующему конвейеру и запускать их с любым желаемым приемником:

val pipeline = flow1.via(flow2)

val fileSource = FileIO.fromPath(Paths.get("file.csv"))
  .via(Framing.delimiter(ByteString("\n"), 256, true)
  .map(_.utf8String))
  .via(pipeline)
  .runWith(sink)

val sqsSource = Slick
  .source(sql"SELECT NAME FROM USERS".as[String])
  .via(pipeline)
  .runWith(sink)

val slickFlow = SqsSource(queue, sqsSourceSettings).take(100)
  .map(_.getBody)
  .via(pipeline)
  .runWith(sink)

Редактировать: Ну, кроме стратегии actorRef, вы также можете использовать Source.queue и создавать свои сообщения, вызывая queue.offer:

def source = Source
  .queue(Int.MaxValue, OverflowStrategy.backpressure)
  .map { name: String => s"hello, $name" }
  .toMat(BroadcastHub.sink[String])(Keep.both)
  .run()

def wsHandler(s: Source[String, NotUsed]): Flow[Message, Message, NotUsed] = Flow[Message]
  .mapConcat(_ => Nil)
  .merge(s)
  .map(TextMessage(_))

import scala.concurrent.duration._

val websocketRoute =
  path("greeter" / Segment) { name =>
    val (queue, s) = source

    Source
      .tick(
        initialDelay = 1 second,
        interval = 1 second,
        tick = None
      )
      .map { _ =>
        queue.offer(name)
      }
      .runWith(Sink.ignore)

    handleWebSocketMessages(wsHandler(s))
  }

Внешние ссылки:

...