Как отправить ACK в БУДУЩЕМ? - PullRequest
0 голосов
/ 16 апреля 2019

Я использую актера в качестве приемника следующим образом:

import akka.Done
import akka.actor.{Actor, ActorLogging, Props}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.concurrent._
import scala.util.{Failure, Success}

object WsActor {
  def props: Props = Props(new WsActor)
}

final class WsActor extends Actor with ActorLogging {

  import com.sweetsoft.WsConnector._

  implicit val materializer: Materializer = ActorMaterializer()
  implicit val ec: ExecutionContextExecutor = context.system.dispatcher
  implicit val actor = context.system

  // Future[Done] is the materialized value of Sink.foreach,
  // emitted when the stream completes
  private val incoming: Sink[Message, Future[Done]] =
  Sink.foreach[Message] {
    case message: TextMessage.Strict =>
      println(message.text)
    case _ =>
      println("Unknown messages.")
  }

  //private val outgoing: Source[Message, Promise[Option[Message]]] =
  //  Source.maybe[Message]


  log.info("Websocket actor started.")

  override def receive: Receive = {
    case Initialized =>
      log.info("Initialization to receive messages via stream.")
      sender() ! Ack
    case Completed =>
      log.info("Streams completed.")
      sender() ! Ack
    case Msg(value) =>

      // the Future[Done] is the materialized value of Sink.foreach
      // and it is completed when the stream completes
      val flow: Flow[Message, Message, Future[Done]] =
        Flow.fromSinkAndSourceMat(incoming, Source.single(TextMessage(value)))(Keep.left)

      // upgradeResponse is a Future[WebSocketUpgradeResponse] that
      // completes or fails when the connection succeeds or fails
      // and closed is a Future[Done] representing the stream completion from above
      val (upgradeResponse, _) =
      Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow)

      upgradeResponse.flatMap { upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
          Future.successful(Done)
        } else {
          throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
        }
      }.andThen {
        case Success(_) =>
          log.info("Sending ACK")
          sender() ! Ack
      }.onComplete {
        case Success(_) =>
          log.info("Success proceed")
        case Failure(ex) => log.error(ex.getMessage)
      }
    //sender() ! Ack
    case Failed(ex) =>
      log.info(s"Stream failed with ${ex.getMessage}.")
  }

}  

Актор использует сообщения и перенаправляет их дальше на сервер веб-сокетов.

Где-то в коде я посылаю Ack актеру Sender, чтобы сигнализировать, что он готов к получению дальнейших сообщений.Но субъект-отправитель никогда не получает сообщение Ack.

Если я поставлю sender() ! Ack цепочку FUTURE, то она будет работать как положено.

Можно ли поместить sender() ! Ack в цепочку FUTURE`.

Ответы [ 2 ]

1 голос
/ 16 апреля 2019

Ваша ошибка в следующих строках:

}.andThen {
case Success(_) =>
  log.info("Sending ACK")
  sender() ! Ack
}.onComplete {

Обратный вызов andThen будет выполняться другим потоком, а вызов sender() может быть пустым (относительно удачным) или даже другим актером вообще. Вам необходимо перехватить отправителя перед выполнением первого асинхронного действия.

val respondTo = sender()
upgradeResponse.flatMap { upgrade =>
  if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
    Future.successful(Done)
  } else {
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  }
}.andThen {
  case Success(_) =>
    log.info("Sending ACK")
    respondTo ! Ack
}.onComplete {
  case Success(_) =>
    log.info("Success proceed")
  case Failure(ex) => log.error(ex.getMessage)
}
1 голос
/ 16 апреля 2019

Попробуйте запомнить отправителя при получении сообщения, а затем использовать его позже в коде вместо sender() вместо вызова sender () в нисходящем направлении, это значение может быть не постоянным во время обработки (например, при получении других сообщений в то время, когдаВаша текущая задача не блокирует обработку очереди сообщений из-за фьючерсов)

case Msg(value) =>
  val sender = sender()
  .
  .
  sender ! Ack
...