Миграция из Play Websocket в Akka HTTP Websocket - PullRequest
0 голосов
/ 28 июня 2018

Я мигрирую из Play в Akka HTTP. У меня есть JAR-код зависимости, который я не могу изменить, который принимает

Flow[Array[Byte],Array[Byte],Any] 

, что обеспечивает Play для подключения через WebSocket. В HTTP Akka это определение

Flow[Message,Message,Any] 

Мне нужен перевод между двумя определениями. Я новичок в Akka http, поэтому я не совсем уверен, как поступить. В игре я также использовал ActorFlow.actorRef

handleWebSocketMessages(wsFlow)

def wsFlow: Flow[Message, Message, Any] = {
 ActorFlow.actorRef(websocket => MyBridgeActor.props(websocket))
}

Код ActorFlow зависит только от akka, поэтому я только что скопировал файл в свою базу кода. https://github.com/playframework/playframework/blob/master/framework/src/play-streams/src/main/scala/play/api/libs/streams/ActorFlow.scala

Полагаю, решением было бы создать CustomActorFlow, который будет включать преобразование из Message в Array [Byte]. MyBridgeActor принимает веб-сокет в формате Flow [Array [Byte], Array [Byte], Any].

1 Ответ

0 голосов
/ 29 июня 2018

Используя akka stream api, вы можете преобразовать поток следующим образом:

import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink}
import akka.util.ByteString

import scala.concurrent.Future

handleWebSocketMessages(msgFlow)

def msgFlow: Flow[Message, Message, Any] = convertFlow(bytesFlow)

def bytesFlow: Flow[Array[Byte], Array[Byte], Any] = {
  // Can just copy ActorFlow over, no need to customize
  ActorFlow.actorRef[Array[Byte], Array[Byte]](...)
}

def covertFlow(msgs: Flow[Array[Byte], Array[Byte], Any])(implicit materializer: Materializer): Flow[Message, Message, Any] =
  Flow[Message]
    .mapAsync(2)(msgToBytes)
    .via(msgs)
    .map(bytesToMsg)


def bytesToMsg(bytes: Array[Byte]): Message = {
  // This depends on your application:
  //   is the outgoing message text or binary?
  val isText = true
  if (isText) {
    TextMessage(new String(bytes, "UTF-8"))
  } else {
    BinaryMessage(ByteString(bytes))
  }
}

def msgToBytes(msg: Message)(implicit materializer: Materializer): Future[Array[Byte]] = {
  msg match {
    case TextMessage.Strict(data) =>
      Future.successful(data.getBytes("UTF-8"))
    case TextMessage.Streamed(stream) =>
      stream.fold("")(_ + _).map(_.getBytes("UTF-8")).runWith(Sink.head)
    case BinaryMessage.Strict(data) =>
      Future.successful(data.toArray[Byte])
    case BinaryMessage.Streamed(stream) =>
      stream.fold(ByteString.empty)(_ ++ _).map(_.toArray[Byte]).runWith(Sink.head)
  }
}
...