Я пытаюсь написать фрагмент кода, который будет использовать поток тикеров (символ биржи компании) и получать информацию о компании из REST API для каждого тикера.
Я хочу получать информацию для нескольких компаний асинхронно.
Я хотел бы сохранить результаты в файл в непрерывном режиме, так как весь набор данных может не помещаться в память.
Следуя документации по потокам и ресурсам akka, которые мне удалось отыскать по этой теме, я разработал следующий фрагмент кода (некоторые части для краткости опущены):
implicit val actorSystem: ActorSystem = ActorSystem("stock-fetcher-system")
implicit val materializer: ActorMaterializer = ActorMaterializer(None, Some("StockFetcher"))(actorSystem)
implicit val context = system.dispatcher
import CompanyJsonMarshaller._
val parallelism = 10
val connectionPool = Http().cachedHostConnectionPoolHttps[String](s"api.iextrading.com")
val listOfSymbols = symbols.toList
val outputPath = "out.txt"
Source(listOfSymbols)
.mapAsync(parallelism) {
stockSymbol => Future(HttpRequest(uri = s"https://api.iextrading.com/1.0/stock/${stockSymbol.symbol}/company"), stockSymbol.symbol)
}
.via(connectionPool)
.map {
case (Success(response), _) => Unmarshal(response.entity).to[Company]
case (Failure(ex), symbol) => println(s"Unable to fetch char data for $symbol") "x"
}
.runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))
.onComplete { _ =>
bufferedSource.close
actorSystem.terminate()
}
Это проблемная строка:
runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))
, который не компилируется, и компилятор выдает мне таинственно выглядящую ошибку:
Type mismatch, expected Graph[SinkShape[Any, NotInferedMat2], actual Sink[ByeString, Future[IOResult]]
Если я изменю приемник на Sink.ignore или println (_), это сработает.
Буду признателен за более подробное объяснение.