akka stream, интегрирующий вызов веб-запроса akka-htpp в поток - PullRequest
0 голосов
/ 16 декабря 2018

Начало работы с Akka Streams Я хочу выполнить простое вычисление.Расширение базового QuickStart https://doc.akka.io/docs/akka/2.5/stream/stream-quickstart.html с помощью вызова успокоительного веб-интерфейса:

val source: Source[Int, NotUsed] = Source(1 to 100)
source.runForeach(println)

уже отлично работает для печати чисел.Но при попытке создать Actor для выполнения HTTP-запроса (действительно ли это необходимо?) В соответствии с https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-integrations.html

  import akka.pattern.ask
  implicit val askTimeout = Timeout(5.seconds)
  val words: Source[String, NotUsed] =
    Source(List("hello", "hi"))

  words
    .mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
    // continue processing of the replies from the actor
    .map(_.toLowerCase)
    .runWith(Sink.ignore)

я не могу заставить его скомпилироваться, так как оператор ? не определен.Насколько я знаю, это будет определено только внутри актера.Я также еще не понимаю, где именно внутри mapAsync должен быть вызван мой пользовательский актер.

edit

https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams/ содержит хотя бы части примера.Похоже, что создание актера не обязательно, т.е.

implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()


val source = Source(List("232::03::14062::19965186", "232::03::14062::19965189"))
    .map(cellKey => {
      val splits = cellKey.split("::")
      val mcc = splits(0)
      val mnc = splits(1)
      val lac = splits(2)
      val ci = splits(3)
      CellKeySource(cellKey, mcc, mnc, lac, ci)
    })
    .limit(2)
    .mapAsyncUnordered(2)(ck => getResponse(ck.cellKey, ck.mobileCountryCode, ck.mobileNetworkCode, ck.locationArea, ck.cellKey)("<<myToken>>"))

  def getResponse(cellKey: String, mobileCountryCode:String, mobileNetworkCode:String, locationArea:String, cellId:String)(token:String): Future[String] = {
    RestartSource.withBackoff(
      minBackoff = 10.milliseconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2,
      maxRestarts = 2
    ) { () =>
      val responseFuture: Future[HttpResponse] =
        Http().singleRequest(HttpRequest(uri = s"https://www.googleapis.com/geolocation/v1/geolocate?key=${token}", entity = ByteString(
          // TODO use proper JSON objects
          s"""
             |{
             |  "cellTowers": [
             |    "mobileCountryCode": $mobileCountryCode,
             |    "mobileNetworkCode": $mobileNetworkCode,
             |    "locationAreaCode": $locationArea,
             |    "cellId": $cellId,
             |  ]
             |}
          """.stripMargin)))

      Source.fromFuture(responseFuture)
        .mapAsync(parallelism = 1) {
          case HttpResponse(StatusCodes.OK, _, entity, _) =>
            Unmarshal(entity).to[String]
          case HttpResponse(statusCode, _, _, _) =>
            throw WebRequestException(statusCode.toString() )
        }
    }
      .runWith(Sink.head)
      .recover {
        case _ => throw StreamFailedAfterMaxRetriesException()
      }
  }

val done: Future[Done] = source.runForeach(println)
done.onComplete(_ ⇒ system.terminate())

уже является (частичным) ответом на вопрос, т. Е. Как интегрировать Akka-streams + akka-http.Однако, это не работает, то есть только выдает ошибку 400 и никогда не завершается.

Ответы [ 2 ]

0 голосов
/ 17 декабря 2018
  1. я думаю, что вы уже нашли api как позвонить клиенту akka-http

  2. относительно вашего первого фрагмента кода, который неРабота.Я думаю, что произошло какое-то недопонимание самого примера.Вы ожидали, что код в примере будет работать после того, как только что скопировал.но цель документа состояла в том, чтобы продемонстрировать только пример / концепцию, как вы можете делегировать какую-то долго выполняющуюся задачу из потока потока и затем использовать результат, когда он будет готов.для этого был использован ask вызов актера akka, потому что вызов метода ask возвращает Future.вероятно, авторы документа просто опустили определение актера.Вы можете попробовать этот пример:

    import java.lang.System.exit
    
    import akka.NotUsed
    import akka.actor.{Actor, ActorRef, ActorSystem, Props}
    import akka.pattern.ask
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{Sink, Source}
    import akka.util.Timeout
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.language.higherKinds
    
    object App extends scala.App {
    
      implicit val sys: ActorSystem = ActorSystem()
      implicit val mat: ActorMaterializer = ActorMaterializer()
    
      val ref: ActorRef = sys.actorOf(Props[Translator])
    
      implicit val askTimeout: Timeout = Timeout(5.seconds)
      val words: Source[String, NotUsed] = Source(List("hello", "hi"))
    
      words
        .mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
        .map(_.toLowerCase)
        .runWith(Sink.foreach(println))
        .onComplete(t => {
          println(s"finished: $t")
          exit(1)
        })
    }
    
    class Translator extends Actor {
    
      override def receive: Receive = {
        case msg => sender() ! s"$msg!"
      }
    }
    
0 голосов
/ 17 декабря 2018

Вы должны импортировать шаблон запроса из Акки.

import akka.pattern.ask

Редактировать: ОК, извините, я вижу, что вы уже импортировали.Что такое ссылка в вашем коде?ActorRef?

...