Akka Streams / HTTP: получить оригинальный запрос от ответа - PullRequest
0 голосов
/ 14 мая 2018

У меня есть источник Akka Streams, который проходит через поток и отправляет HTTP-запрос:

source.map(toRequest)
  .via(Http().outgoingConnection(host))
  .map(toMessage) 

Предположим, что метод toRequest отображает строку в HttpRequest, а метод toMessage отображает HttpResponse в класс сообщений, необходимый для обработки в нисходящем направлении. Предположим, что класс сообщения должен содержать некоторую исходную информацию.

Можно ли получить оригинал HttpRequest от HttpResponse? Если нет, есть ли способ сохранить некоторую исходную информацию?

Ответы [ 2 ]

0 голосов
/ 14 мая 2018

Вы можете использовать свой api-график для обхода вашего запроса HttpRequest.Один из примеров:

object Main extends App {

  implicit val as = ActorSystem("Test")
  implicit val m = ActorMaterializer()
  implicit val ec = as.dispatcher

  def src1 = Source.fromIterator(() => List(1, 2, 3).toIterator)

  val srcGraph = Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    // Create one flow to prepend the 'Number' string to the input integer
    def flow1 = Flow[Int].map { el => s"Number $el"  }

    // Create a broadcast stage 
    val broadcast = builder.add(Broadcast[Int](2))
    val zip       = builder.add(Zip[Int, String]())

    src1 ~> broadcast.in

    // The 0 port sends the int to the zip stage directly
    broadcast.out(0) ~>          zip.in0
    broadcast.out(1) ~> flow1 ~> zip.in1

    SourceShape(zip.out)
  })

  Source.fromGraph(srcGraph).runForeach(println(_))
}

График API предоставляет множество возможностей для таких вещей.

0 голосов
/ 14 мая 2018

Один из подходов заключается в использовании Future варианта клиентского API, основанного на *, и пользовательского класса наблюдений, который содержит информацию, которую вы хотите распространять в нисходящем направлении. Например:

case class Message(request: HttpRequest, response: HttpResponse)

source
  .map(toRequest)
  .mapAsync(parallelism = 3) { request => // adjust the level of parallelism as needed
    Http().singleRequest(request).map(response => Message(request, response))
  }
  // continue processing; at this point you have a Source[Message, _]
...