Обработка потока akka асинхронно и запись в приемник файла - PullRequest
1 голос
/ 11 апреля 2019

Я пытаюсь написать фрагмент кода, который будет использовать поток тикеров (символ биржи компании) и получать информацию о компании из REST API для каждого тикера.

Я хочу получать информацию для нескольких компаний асинхронно.

Я хотел бы сохранить результаты в файл в непрерывном режиме, так как весь набор данных может не помещаться в память.

Следуя документации по потокам и ресурсам akka, которые мне удалось отыскать по этой теме, я разработал следующий фрагмент кода (некоторые части для краткости опущены):

  implicit val actorSystem: ActorSystem = ActorSystem("stock-fetcher-system")
  implicit val materializer: ActorMaterializer = ActorMaterializer(None, Some("StockFetcher"))(actorSystem)
  implicit val context = system.dispatcher

  import CompanyJsonMarshaller._
  val parallelism = 10
  val connectionPool = Http().cachedHostConnectionPoolHttps[String](s"api.iextrading.com")
  val listOfSymbols = symbols.toList

  val outputPath = "out.txt"  


  Source(listOfSymbols)
    .mapAsync(parallelism) {
      stockSymbol => Future(HttpRequest(uri = s"https://api.iextrading.com/1.0/stock/${stockSymbol.symbol}/company"), stockSymbol.symbol)
    }
    .via(connectionPool)
    .map {
      case (Success(response), _) => Unmarshal(response.entity).to[Company]
      case (Failure(ex), symbol)       => println(s"Unable to fetch char data for $symbol") "x"
    }
    .runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))
    .onComplete { _ =>
      bufferedSource.close
      actorSystem.terminate()
    }

Это проблемная строка:

runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))

, который не компилируется, и компилятор выдает мне таинственно выглядящую ошибку:

Type mismatch, expected Graph[SinkShape[Any, NotInferedMat2], actual Sink[ByeString, Future[IOResult]]

Если я изменю приемник на Sink.ignore или println (_), это сработает.

Буду признателен за более подробное объяснение.

1 Ответ

0 голосов
/ 11 апреля 2019

Как указывает компилятор, типы не совпадают. В звонилке .map ...

.map {
  case (Success(response), _) =>
    Unmarshal(response.entity).to[Company]
  case (Failure(ex), symbol)  =>
    println(s"Unable to fetch char data for $symbol")
    "x"
}

... вы возвращаете либо экземпляр Company, либо String, поэтому компилятор выводит ближайший супертип (или "наименьшие верхние границы") как Any. Sink ожидает элементы ввода типа ByteString, а не Any.

Один из подходов состоит в том, чтобы отправить ответ в приемник файла без демаршаллинга ответа:

Source(listOfSymbols)
  .mapAsync(parallelism) {
    ...
  }
  .via(connectionPool)
  .map(_.entity.dataBytes) // entity.dataBytes is a Source[ByteString, _]
  .flatMapConcat(identity)
  .runWith(FileIO.toPath(...))
...