Как использовать Akka Stream с Akk-Http для потоковой передачи ответа - PullRequest
0 голосов
/ 17 марта 2019

Я новичок в Akka Stream. Я использовал следующий код для разбора CSV.

class CsvParser(config: Config)(implicit system: ActorSystem) extends LazyLogging with NumberValidation {

  import system.dispatcher

  private val importDirectory = Paths.get(config.getString("importer.import-directory")).toFile
  private val linesToSkip = config.getInt("importer.lines-to-skip")
  private val concurrentFiles = config.getInt("importer.concurrent-files")
  private val concurrentWrites = config.getInt("importer.concurrent-writes")
  private val nonIOParallelism = config.getInt("importer.non-io-parallelism")

  def save(r: ValidReading): Future[Unit] = {
      Future()
  }

  def parseLine(filePath: String)(line: String): Future[Reading] = Future {
    val fields = line.split(";")
    val id = fields(0).toInt
    try {
      val value = fields(1).toDouble
      ValidReading(id, value)
    } catch {
      case t: Throwable =>
        logger.error(s"Unable to parse line in $filePath:\n$line: ${t.getMessage}")
        InvalidReading(id)
    }
  }

  val lineDelimiter: Flow[ByteString, ByteString, NotUsed] =
    Framing.delimiter(ByteString("\n"), 128, allowTruncation = true)

  val parseFile: Flow[File, Reading, NotUsed] =
    Flow[File].flatMapConcat { file =>
      val src = FileSource.fromFile(file).getLines()
      val source : Source[String, NotUsed] = Source.fromIterator(() => src)
      // val gzipInputStream = new GZIPInputStream(new FileInputStream(file))

      source
        .mapAsync(parallelism = nonIOParallelism)(parseLine(file.getPath))
    }

  val computeAverage: Flow[Reading, ValidReading, NotUsed] =
    Flow[Reading].grouped(2).mapAsyncUnordered(parallelism = nonIOParallelism) { readings =>
      Future {
        val validReadings = readings.collect { case r: ValidReading => r }
        val average = if (validReadings.nonEmpty) validReadings.map(_.value).sum / validReadings.size else -1
        ValidReading(readings.head.id, average)
      }
    }

  val storeReadings: Sink[ValidReading, Future[Done]] =
    Flow[ValidReading]
      .mapAsyncUnordered(concurrentWrites)(save)
      .toMat(Sink.ignore)(Keep.right)

  val processSingleFile: Flow[File, ValidReading, NotUsed] =
    Flow[File]
      .via(parseFile)
      .via(computeAverage)

  def importFromFiles = {
    implicit val materializer = ActorMaterializer()

    val files = importDirectory.listFiles.toList
    logger.info(s"Starting import of ${files.size} files from ${importDirectory.getPath}")

    val startTime = System.currentTimeMillis()

    val balancer = GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val balance = builder.add(Balance[File](concurrentFiles))
      val merge = builder.add(Merge[ValidReading](concurrentFiles))

      (1 to concurrentFiles).foreach { _ =>
        balance ~> processSingleFile ~> merge
      }

      FlowShape(balance.in, merge.out)
    }

    Source(files)
      .via(balancer)
      .withAttributes(ActorAttributes.supervisionStrategy { e =>
        logger.error("Exception thrown during stream processing", e)
        Supervision.Resume
      })
      .runWith(storeReadings)
      .andThen {
        case Success(_) =>
          val elapsedTime = (System.currentTimeMillis() - startTime) / 1000.0
          logger.info(s"Import finished in ${elapsedTime}s")
        case Failure(e) => logger.error("Import failed", e)
      }
  }
}

Я хотел использовать Akka HTTP, который дал бы все ValidReading сущности, проанализированные из CSV, но я не мог понять, как мне это сделать.

Приведенный выше код выбирает файл с сервера и анализирует каждую строку для генерации ValidReading.

Как я могу передать / загрузить CSV через akka-http, проанализировать файл и передать полученный ответ обратно в конечную точку?

1 Ответ

0 голосов
/ 17 марта 2019

«Суть» решения примерно такая:

import akka.http.scaladsl.server.Directives._
val route = fileUpload("csv") {
  case (metadata, byteSource) =>
    val source = byteSource.map(x => x)
    complete(HttpResponse(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, source)))
}

Вы обнаруживаете, что загруженная вещь представляет собой данные из нескольких частей формы с чанком с именем «csv».Вы получаете byteSource от этого.Выполните расчет (вставьте свою логику в часть .map(x=>x)).Преобразуйте ваши данные обратно в ByteString.Завершите запрос с новым источником.Это сделает ваше присоединение как прокси.

...