Начало работы с Akka Streams Я хочу выполнить простое вычисление.Расширение базового QuickStart https://doc.akka.io/docs/akka/2.5/stream/stream-quickstart.html с помощью вызова успокоительного веб-интерфейса:
val source: Source[Int, NotUsed] = Source(1 to 100)
source.runForeach(println)
уже отлично работает для печати чисел.Но при попытке создать Actor для выполнения HTTP-запроса (действительно ли это необходимо?) В соответствии с https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-integrations.html
import akka.pattern.ask
implicit val askTimeout = Timeout(5.seconds)
val words: Source[String, NotUsed] =
Source(List("hello", "hi"))
words
.mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
// continue processing of the replies from the actor
.map(_.toLowerCase)
.runWith(Sink.ignore)
я не могу заставить его скомпилироваться, так как оператор ?
не определен.Насколько я знаю, это будет определено только внутри актера.Я также еще не понимаю, где именно внутри mapAsync
должен быть вызван мой пользовательский актер.
edit
https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams/ содержит хотя бы части примера.Похоже, что создание актера не обязательно, т.е.
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val source = Source(List("232::03::14062::19965186", "232::03::14062::19965189"))
.map(cellKey => {
val splits = cellKey.split("::")
val mcc = splits(0)
val mnc = splits(1)
val lac = splits(2)
val ci = splits(3)
CellKeySource(cellKey, mcc, mnc, lac, ci)
})
.limit(2)
.mapAsyncUnordered(2)(ck => getResponse(ck.cellKey, ck.mobileCountryCode, ck.mobileNetworkCode, ck.locationArea, ck.cellKey)("<<myToken>>"))
def getResponse(cellKey: String, mobileCountryCode:String, mobileNetworkCode:String, locationArea:String, cellId:String)(token:String): Future[String] = {
RestartSource.withBackoff(
minBackoff = 10.milliseconds,
maxBackoff = 30.seconds,
randomFactor = 0.2,
maxRestarts = 2
) { () =>
val responseFuture: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = s"https://www.googleapis.com/geolocation/v1/geolocate?key=${token}", entity = ByteString(
// TODO use proper JSON objects
s"""
|{
| "cellTowers": [
| "mobileCountryCode": $mobileCountryCode,
| "mobileNetworkCode": $mobileNetworkCode,
| "locationAreaCode": $locationArea,
| "cellId": $cellId,
| ]
|}
""".stripMargin)))
Source.fromFuture(responseFuture)
.mapAsync(parallelism = 1) {
case HttpResponse(StatusCodes.OK, _, entity, _) =>
Unmarshal(entity).to[String]
case HttpResponse(statusCode, _, _, _) =>
throw WebRequestException(statusCode.toString() )
}
}
.runWith(Sink.head)
.recover {
case _ => throw StreamFailedAfterMaxRetriesException()
}
}
val done: Future[Done] = source.runForeach(println)
done.onComplete(_ ⇒ system.terminate())
уже является (частичным) ответом на вопрос, т. Е. Как интегрировать Akka-streams + akka-http.Однако, это не работает, то есть только выдает ошибку 400 и никогда не завершается.