Akka Streams и сервер Scala Play - PullRequest
0 голосов
/ 15 мая 2018

У меня есть сервер, написанный на scala play 2.6

Я пытаюсь получить веб-сокет

  1. Получить запрос от клиента
  2. Обработка этого запроса
  3. Передать результат всем клиентам, если Right передает сообщение об ошибке только клиент, отправивший запрос, если ушел

У меня есть сообщения, транслируемые всем клиентам прямо сейчас, кто-нибудь знает, как ответить только отправителю в случае ошибки?

  val processFlow = Flow[String].map(process).map(_.toString)

  val (sink, source) = { 
    MergeHub.source[String]
      .via(processFlow)
      .toMat(BroadcastHub.sink[String])(Keep.both)
      .run()
  }

  val websocketFlow = Flow.fromSinkAndSource(sink, source)

  def ws = WebSocket.accept[String, String] { request =>  
    websocketFlow
  }

  def process(message: String): Either[String, String] = { 
    if (message == "error") { // replace with any error condition
      Left ("ERROR " ++ message)
    } else {
      Right (message ++ " processed")
    }   
  }

1 Ответ

0 голосов
/ 15 мая 2018

Если вы отслеживаете отправителя в своем потоке, вы можете отфильтровать полученное сообщение перед отправкой его в веб-сокет:

case class ProcessResult(senderId: String, result: Either[String, String])

val (sink, source) = { 
  MergeHub.source[ProcessResult]
    .toMat(BroadcastHub.sink[ProcessResult])(Keep.both)
    .run()
}
val websocketFlow = Flow.fromSinkAndSource(sink, source)

def ws = WebSocket.accept[String, String] { request =>
  // create a random id to identify the sender
  val senderId = UUID.randomUUID().toString
  Flow[String]
    .map(process)
    .map(result => ProcessResult(senderId, result))
    // broadcast the result to the other websockets
    .via(websocketFlow)
    // filter the results to only keep the errors for the sender
    .collect {
      case ProcessResult(sender, Left(error)) if sender == senderId => List(error)
      case ProcessResult(_, Left(error)) => List.empty
      case ProcessResult(_, Right(result)) => List(result)
    }.mapConcat(identity)
}

def process(message: String): Either[String, String] = { 
  if (message == "error") { // replace with any error condition
    Left ("ERROR " ++ message)
  } else {
    Right (message ++ " processed")
  }   
}
...