Akka http bufferoverflowexception при отправке нескольких hjttprequests - PullRequest
0 голосов
/ 10 января 2019

Запрос http в файле ниже дает мне исключение BuferOverflowException. Я пробовал это, используя connectionFlow, но это дало эту ошибку Легальный запрос, отвечающий со статусом «400 Bad Request»: значение заголовка «Host» для запроса application url не соответствует целевому полномочию запроса: Заголовок хоста: Некоторые (Host: localhost: 9005), я заменил строку хоста на URL приложения, и он выдал ту же ошибку.

Это код, по которому вызывается BulkActor:

protected def processFileForBulkPositionAssignments(file: String, jobDetails: GetJobDetails, resultDocumentId: String) = {
    convertCsvLinesStringToBulkImportPositionByUserId(file) match {
      case Left(error) => log.info(s"BulkUploadPositionJobService :: performBulkUploadUserPositions :: " +
        s"errorMessage from convertCsvLinesStringToJsonList ${error}")
        updateJobStatus(jobDetails, None, None, FAILED_STATUS, None, None) //file could not be converted to json
      case Right(bulkImportDetails) => log.info(s"JSON STRING LIST IS :: ${bulkImportDetails}")
      // TODO: the json string list will contain the same number of lines as present in the document blob
      // TODO: process the lines as per the batch-size defined in the config file
        val bulkImportDetailBatches = bulkImportDetails.grouped(pagesize).toList
        log.info(s"BulkUploadPositionJobService :: processFileForBulkPositionAssignments :: " +
          s"total batches are :: ${bulkImportDetailBatches.size}, with batch-size :: ${pagesize}")
        bulkImportDetailBatches.map { bulkImportDetailBatch =>
          (authorizeActor ? GetClientCredentialsAccessTokenRequest()).mapTo[GetClientCredentialsAccessTokenResponse].map {
            _.accessTokenResponse match {
              case Left(error) => log.info(s"BulkUploadPositionJobService :: processFileForBulkPositionAssignments :: " +
                s"errorMessage from authorizeActor ${error}")
                updateJobStatus(jobDetails, None, None, FAILED_STATUS, None, None) //Error in access token generation
              case Right(token) =>
                log.info("ACCESS TOKEN FOUND AND IS CORRECT ...........")
                bulkImportDetailBatch.map{ userPositionMappingDetail =>
                (bulkPositionActor ? CreateBulkPositionRequest(token, userPositionMappingDetail)).mapTo[CreateBulkPositionResponse].map {
                  response => response.creationResponse match {
                    case Left(error) =>
                    case Right(message) =>
                  }
                }
              }
            }
          }
        }
    }
  }

package mcc.identity.adminbatch.actor

import akka.actor.{Actor, ActorRef}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import akka.stream.scaladsl.{Flow, Sink}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.util.ByteString
import com.typesafe.scalalogging.Logger
import mcc.identity.adminbatch.HttpServer
import mcc.identity.adminbatch.model.PositionRouteModel.BulkImportPositionByUserId
import mcc.identity.adminbatch.model.RouteModel.{ErrorDetails, ErrorMessage}
import mcc.identity.adminbatch.util.BatchConstants
import org.json4s.DefaultFormats
import org.json4s.jackson.Json
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.slf4j.LoggerFactory

import scala.concurrent.Future

object BulkPositionActor {
  case class CreateBulkPositionRequest(accessToken: OAuth2BearerToken, addUserPositionDetails : BulkImportPositionByUserId)
  case class CreateBulkPositionResponse(creationResponse: Either[ErrorMessage, String])
}

class BulkPositionActor extends Actor {

  import BulkPositionActor._
  import context.dispatcher
  implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))
  val http = Http(context.system)
  implicit val system = HttpServer.system
  lazy val log: Logger = Logger(LoggerFactory.getLogger(getClass.getName))

  val toSender = sender
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.error(s"BulkPositionActor restarted ${message.getOrElse("")}", reason)
  }

  def receive: Receive = {
    case request: CreateBulkPositionRequest => addUserPositionMapping(sender, request)
  }

  val host = "localhost"//EnvironmentService.getValue(HttpServerHost)
  val port = 9005 //Integer.parseInt(EnvironmentService.getValue(HttpServerPort))

  val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection(host, port)

  //val poolClientFlow = Http().cachedHostConnectionPool[HttpRequest](host, port)
//  val poolClientFlow : Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =
//    Http().cachedHostConnectionPool[Int](host, port, ConnectionPoolSettings(system))

//  def performRequest(request: HttpRequest): Future[HttpResponse] =
//    Source
//      .single(request)
//   .via(connectionFlow)
//      .mapAsync(1) {
//        case (response, _) =>
//          Future.fromTry(response)
//      }
//      .runWith(Sink.head)
//
//  def performRequest(request: HttpRequest): Future[HttpResponse] =
//    Source
//      .single(request)
//      .via(connectionFlow)
//      .mapAsync(1) {
//        case (response, _) =>
//          Future.fromTry(response)
//      }
//      .runWith(Sink.head)


//  val pool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =
//    Http().cachedHostConnectionPool[Int](host, port, ConnectionPoolSettings(system))
//
//  def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
//    val unique = 123
//    Source.single(req -> unique).via(pool).runWith(Sink.head).flatMap {
//      case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
//      case (Failure(f), `unique`) ⇒ Future.failed(f)
//      case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
//    }
//  }



  def createUploadRequest(accessToken: OAuth2BearerToken, addUserPositionDetails: BulkImportPositionByUserId) = {
    HttpRequest(
      uri = Uri(BatchConstants.idamAdminServicesForCreateUserPositionMapping + s"/${addUserPositionDetails.userId.getOrElse("")}/positions"),
      method = HttpMethods.POST,
      headers = List(Authorization(accessToken)),
      entity = HttpEntity.apply(ContentTypes.`application/json`, Json(DefaultFormats).write(Array(addUserPositionDetails))),
      protocol = HttpProtocols.`HTTP/1.1`)
  }


  protected def addUserPositionMapping(sender: ActorRef, request: CreateBulkPositionRequest) = {
    http.singleRequest(
      HttpRequest(
        uri = Uri(BatchConstants.idamAdminServicesForCreateUserPositionMapping + s"/${request.addUserPositionDetails.userId.getOrElse("")}/positions"),
        method = HttpMethods.POST,
        headers = List(Authorization(request.accessToken)),
        entity = HttpEntity.apply(ContentTypes.`application/json`, Json(DefaultFormats).write(Array(request.addUserPositionDetails))),
        protocol = HttpProtocols.`HTTP/1.1`)
    ) flatMap { response =>
      log.info(s"RESPONSE IS :: --------- ${response}")
      response.status.intValue() match {
        case StatusCodes.MultiStatus.intValue =>
          response.entity.dataBytes.runWith(Sink.fold(ByteString.empty)(_ ++ _)).map(_.utf8String).map { result =>
            val parsedJson = parse(result.replaceAll("\\\\", "")) \ "results"
            compact(parsedJson \\ "status").toInt match {
              case StatusCodes.OK.intValue =>
                log.info(s"BulkPositionActor :: addUserPositionMapping :: request was successful :: ${compact(parsedJson \\ "message")} , " +
                  s"for position ${request.addUserPositionDetails}")
                sender ! CreateBulkPositionResponse(Right(compact(parsedJson \\ "message")))
              case StatusCodes.InternalServerError.intValue =>
                log.info(s"BulkPositionActor :: addUserPositionMapping :: $parsedJson")
                sender ! CreateBulkPositionResponse(Left(ErrorMessage(compact(parsedJson \\ "status").toInt, StatusCodes.InternalServerError.reason, Seq())))
              case other => log.info(s"BulkPositionActor :: addUserPositionMapping :: request failed :: $parsedJson")
                sender ! CreateBulkPositionResponse(Left(ErrorMessage(compact(parsedJson \\ "status").toInt, StatusCodes.getForKey(other).get.reason,
                  Seq(ErrorDetails(compact(render(parsedJson)), compact(parsedJson \\ "errorMessage").replaceAll("\"", ""))))))
            }
          }
        case others =>
          Future.successful(sender ! CreateBulkPositionResponse(Left(ErrorMessage(others, "Response Invalid",
            Seq(ErrorDetails("response message",response.entity.dataBytes.runWith(Sink.fold(ByteString.empty)(_ ++ _))
              .map(_.utf8String).map(res => res).toString))))))
      }
    }
  }.recover {
      case ex: Exception =>
        log.error("Failed", ex)
        sender ! CreateBulkPositionResponse(Left(ErrorMessage(500,
          "Internal Server Error", Seq(ErrorDetails("Something went wrong", "Could not fetch access token")))))
    }
}
...