Запрос http в файле ниже дает мне исключение BuferOverflowException. Я пробовал это, используя connectionFlow, но это дало эту ошибку
Легальный запрос, отвечающий со статусом «400 Bad Request»: значение заголовка «Host» для запроса application url
не соответствует целевому полномочию запроса: Заголовок хоста: Некоторые (Host: localhost: 9005), я заменил строку хоста на URL приложения, и он выдал ту же ошибку.
Это код, по которому вызывается BulkActor:
protected def processFileForBulkPositionAssignments(file: String, jobDetails: GetJobDetails, resultDocumentId: String) = {
convertCsvLinesStringToBulkImportPositionByUserId(file) match {
case Left(error) => log.info(s"BulkUploadPositionJobService :: performBulkUploadUserPositions :: " +
s"errorMessage from convertCsvLinesStringToJsonList ${error}")
updateJobStatus(jobDetails, None, None, FAILED_STATUS, None, None) //file could not be converted to json
case Right(bulkImportDetails) => log.info(s"JSON STRING LIST IS :: ${bulkImportDetails}")
// TODO: the json string list will contain the same number of lines as present in the document blob
// TODO: process the lines as per the batch-size defined in the config file
val bulkImportDetailBatches = bulkImportDetails.grouped(pagesize).toList
log.info(s"BulkUploadPositionJobService :: processFileForBulkPositionAssignments :: " +
s"total batches are :: ${bulkImportDetailBatches.size}, with batch-size :: ${pagesize}")
bulkImportDetailBatches.map { bulkImportDetailBatch =>
(authorizeActor ? GetClientCredentialsAccessTokenRequest()).mapTo[GetClientCredentialsAccessTokenResponse].map {
_.accessTokenResponse match {
case Left(error) => log.info(s"BulkUploadPositionJobService :: processFileForBulkPositionAssignments :: " +
s"errorMessage from authorizeActor ${error}")
updateJobStatus(jobDetails, None, None, FAILED_STATUS, None, None) //Error in access token generation
case Right(token) =>
log.info("ACCESS TOKEN FOUND AND IS CORRECT ...........")
bulkImportDetailBatch.map{ userPositionMappingDetail =>
(bulkPositionActor ? CreateBulkPositionRequest(token, userPositionMappingDetail)).mapTo[CreateBulkPositionResponse].map {
response => response.creationResponse match {
case Left(error) =>
case Right(message) =>
}
}
}
}
}
}
}
}
package mcc.identity.adminbatch.actor
import akka.actor.{Actor, ActorRef}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import akka.stream.scaladsl.{Flow, Sink}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.util.ByteString
import com.typesafe.scalalogging.Logger
import mcc.identity.adminbatch.HttpServer
import mcc.identity.adminbatch.model.PositionRouteModel.BulkImportPositionByUserId
import mcc.identity.adminbatch.model.RouteModel.{ErrorDetails, ErrorMessage}
import mcc.identity.adminbatch.util.BatchConstants
import org.json4s.DefaultFormats
import org.json4s.jackson.Json
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.slf4j.LoggerFactory
import scala.concurrent.Future
object BulkPositionActor {
case class CreateBulkPositionRequest(accessToken: OAuth2BearerToken, addUserPositionDetails : BulkImportPositionByUserId)
case class CreateBulkPositionResponse(creationResponse: Either[ErrorMessage, String])
}
class BulkPositionActor extends Actor {
import BulkPositionActor._
import context.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))
val http = Http(context.system)
implicit val system = HttpServer.system
lazy val log: Logger = Logger(LoggerFactory.getLogger(getClass.getName))
val toSender = sender
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.error(s"BulkPositionActor restarted ${message.getOrElse("")}", reason)
}
def receive: Receive = {
case request: CreateBulkPositionRequest => addUserPositionMapping(sender, request)
}
val host = "localhost"//EnvironmentService.getValue(HttpServerHost)
val port = 9005 //Integer.parseInt(EnvironmentService.getValue(HttpServerPort))
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection(host, port)
//val poolClientFlow = Http().cachedHostConnectionPool[HttpRequest](host, port)
// val poolClientFlow : Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =
// Http().cachedHostConnectionPool[Int](host, port, ConnectionPoolSettings(system))
// def performRequest(request: HttpRequest): Future[HttpResponse] =
// Source
// .single(request)
// .via(connectionFlow)
// .mapAsync(1) {
// case (response, _) =>
// Future.fromTry(response)
// }
// .runWith(Sink.head)
//
// def performRequest(request: HttpRequest): Future[HttpResponse] =
// Source
// .single(request)
// .via(connectionFlow)
// .mapAsync(1) {
// case (response, _) =>
// Future.fromTry(response)
// }
// .runWith(Sink.head)
// val pool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =
// Http().cachedHostConnectionPool[Int](host, port, ConnectionPoolSettings(system))
//
// def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
// val unique = 123
// Source.single(req -> unique).via(pool).runWith(Sink.head).flatMap {
// case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
// case (Failure(f), `unique`) ⇒ Future.failed(f)
// case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
// }
// }
def createUploadRequest(accessToken: OAuth2BearerToken, addUserPositionDetails: BulkImportPositionByUserId) = {
HttpRequest(
uri = Uri(BatchConstants.idamAdminServicesForCreateUserPositionMapping + s"/${addUserPositionDetails.userId.getOrElse("")}/positions"),
method = HttpMethods.POST,
headers = List(Authorization(accessToken)),
entity = HttpEntity.apply(ContentTypes.`application/json`, Json(DefaultFormats).write(Array(addUserPositionDetails))),
protocol = HttpProtocols.`HTTP/1.1`)
}
protected def addUserPositionMapping(sender: ActorRef, request: CreateBulkPositionRequest) = {
http.singleRequest(
HttpRequest(
uri = Uri(BatchConstants.idamAdminServicesForCreateUserPositionMapping + s"/${request.addUserPositionDetails.userId.getOrElse("")}/positions"),
method = HttpMethods.POST,
headers = List(Authorization(request.accessToken)),
entity = HttpEntity.apply(ContentTypes.`application/json`, Json(DefaultFormats).write(Array(request.addUserPositionDetails))),
protocol = HttpProtocols.`HTTP/1.1`)
) flatMap { response =>
log.info(s"RESPONSE IS :: --------- ${response}")
response.status.intValue() match {
case StatusCodes.MultiStatus.intValue =>
response.entity.dataBytes.runWith(Sink.fold(ByteString.empty)(_ ++ _)).map(_.utf8String).map { result =>
val parsedJson = parse(result.replaceAll("\\\\", "")) \ "results"
compact(parsedJson \\ "status").toInt match {
case StatusCodes.OK.intValue =>
log.info(s"BulkPositionActor :: addUserPositionMapping :: request was successful :: ${compact(parsedJson \\ "message")} , " +
s"for position ${request.addUserPositionDetails}")
sender ! CreateBulkPositionResponse(Right(compact(parsedJson \\ "message")))
case StatusCodes.InternalServerError.intValue =>
log.info(s"BulkPositionActor :: addUserPositionMapping :: $parsedJson")
sender ! CreateBulkPositionResponse(Left(ErrorMessage(compact(parsedJson \\ "status").toInt, StatusCodes.InternalServerError.reason, Seq())))
case other => log.info(s"BulkPositionActor :: addUserPositionMapping :: request failed :: $parsedJson")
sender ! CreateBulkPositionResponse(Left(ErrorMessage(compact(parsedJson \\ "status").toInt, StatusCodes.getForKey(other).get.reason,
Seq(ErrorDetails(compact(render(parsedJson)), compact(parsedJson \\ "errorMessage").replaceAll("\"", ""))))))
}
}
case others =>
Future.successful(sender ! CreateBulkPositionResponse(Left(ErrorMessage(others, "Response Invalid",
Seq(ErrorDetails("response message",response.entity.dataBytes.runWith(Sink.fold(ByteString.empty)(_ ++ _))
.map(_.utf8String).map(res => res).toString))))))
}
}
}.recover {
case ex: Exception =>
log.error("Failed", ex)
sender ! CreateBulkPositionResponse(Left(ErrorMessage(500,
"Internal Server Error", Seq(ErrorDetails("Something went wrong", "Could not fetch access token")))))
}
}