Отправить элемент к элементу из источника в Akka Http - PullRequest
0 голосов
/ 03 июля 2018

Я разрабатываю клиент-серверное приложение, используя Akka Http и Akka Streams. Основная идея заключается в том, что сервер должен передать ответ http источником из потоков Akka.

Проблема в том, что сервер накапливает некоторые элементы перед отправкой первого сообщения клиенту. Однако мне нужно, чтобы сервер отправлял элемент элементу, как только источник создал новый элемент.

Пример кода:

case class Example(id: Long, txt: String, number: Double)

object MyJsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
  implicit val exampleFormat = jsonFormat3(Test)
}

class BatchIterator(batchSize: Int, numberOfBatches: Int, pause: FiniteDuration) extends Iterator[Array[Test]]{

  val range = Range(0, batchSize*numberOfBatches).toIterator
  val numberOfBatchesIter = Range(0, numberOfBatches).toIterator

  override def hasNext: Boolean = range.hasNext

  override def next(): Array[Test] = {
    println(s"Sleeping for ${pause.toMillis} ms")
    Thread.sleep(pause.toMillis)
    println(s"Taking $batchSize elements")
    Range(0, batchSize).map{ _ =>
      val count = range.next()
      Test(count, s"Text$count", count*0.5)
    }.toArray
  }
}

object Server extends App {
  import MyJsonProtocol._
  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
      .withFramingRenderer(
        Flow[ByteString].intersperse(ByteString(System.lineSeparator))
      )

  implicit val system = ActorSystem("api")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  def fetchExamples(): Source[Array[Test], NotUsed] = Source.fromIterator(() => new BatchIterator(5, 5, 2 seconds))

  val route =
    path("example") {
      complete(fetchExamples)
    }

  val bindingFuture = Http().bindAndHandle(route, "localhost", 9090)
  println("Server started at localhost:9090")
  StdIn.readLine()
  bindingFuture.flatMap(_.unbind()).onComplete(_ ⇒ system.terminate())
}

Тогда, если я выполню:

curl --no-buffer localhost:9090/example

Я получаю все элементы одновременно вместо того, чтобы получать элемент каждые 2 секунды.

Есть идеи о том, как я могу "заставить" сервер отправлять каждый элемент, когда он выходит из источника?

1 Ответ

0 голосов
/ 07 июля 2018

Наконец-то я нашел решение. Проблема была в том, что источник синхронный ... Поэтому решение состоит в том, чтобы просто вызвать функцию async

complete(fetchExamples.async)
...