Как сделать параллельный Http-запрос, используя Akka-Http? - PullRequest
1 голос
/ 06 марта 2019

Я новичок в Scala и пытаюсь создать библиотеку, в которой я получу тысячи URL-адресов.Моя работа заключается в том, чтобы загружать контент с этих URL.Я бы выбрал простую библиотеку scalaj-http, но она не служит моей цели.Код, с которым я пришел, выглядит следующим образом:

    class ProxyHttpClient {
      def get(url: String, proxy: ProxySettings,urlDownloaderConfig: 
    UrlDownloaderConfig)(implicit ec: ExecutionContext): Either[HttpError, 
    HttpSuccessResponse] = {
        implicit val system: ActorSystem = ActorSystem()
        implicit val materializer: ActorMaterializer = ActorMaterializer()


        val auth = headers.BasicHttpCredentials(proxy.userName, 
    proxy.secret)
    val httpsProxyTransport = 
      ClientTransport.httpsProxy(InetSocketAddress.createUnresolved(
    proxy.host, proxy.port), auth)
    val settings = 
ConnectionPoolSettings(system).withTransport(httpsProxyTransport)
    val response: Future[HttpResponse] = 

Http().singleRequest(HttpRequest().
withMethod(HttpMethods.GET).withUri(url), settings = settings)

    val data: Future[Either[HttpError, HttpSuccessResponse]] = `response.map {`
      case response@HttpResponse(StatusCodes.OK, _, _, _) => {
        val content: Future[String] = Unmarshal(response.entity).to[String]
        val finalContent = Await.ready(content, timeToWaitForContent).value.get.get.getBytes
        Right(HttpSuccessResponse(url, response.status.intValue(), finalContent))
      }
      case errorResponse@HttpResponse(StatusCodes.GatewayTimeout, _, _, _) => Left(HttpError(url, errorResponse.status.intValue(), errorResponse.entity.toString))
    }
    val result: Try[Either[HttpError, HttpSuccessResponse]] = Await.ready(data, timeToWaitForResponse).value.get
    val pop: Either[HttpError, HttpSuccessResponse] = try {
      result.get
    } catch {
      case e: Exception => Left(HttpError(url, HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage))
    }
    pop
  }
}

Для вызова get метода, который я использую

val forkJoinPool = new scala.concurrent.forkjoin.ForkJoinPool(8)
picList.par.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
picList.par.map(testUrl => {
      val resp = get(url, Option(proxy))

    })

Он несколько раз работал гладко, но когда я пытался вызвать метод для 1000 URLчтобы получить изображения в размере пакета 100 он бросил ниже ошибки.После этого даже для одного URL я получаю ту же ошибку.

**java.lang.OutOfMemoryError: unable to create new native thread**
  1. Должен ли я использовать здесь актеров вместо актерской системы и посвятить этому отдельного диспетчера?

  2. Поскольку я держу содержаниеизображение, которое является двоичным, нужно ли мне позаботиться об его удалении из памяти после того, как их назначение выполнено?

Фрагмент кода будет более полезным.Заранее спасибо

Я пытался следовать онлайн-предложениям, в которых люди предлагали использовать

val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")

Но когда я пытался, system.dispatchers.lookup возвращает тип MessageDispacther.

implicit val system: ActorSystem = ActorSystem()
    val ex: MessageDispatcher =system.dispatchers.lookup("io-blocking-dispatcher")

Мне не хватает библиотеки или импорта?

1 Ответ

0 голосов
/ 07 марта 2019

Ваша проблема, скорее всего, связана с созданием актерской системы для каждого http-вызова. Система акторов, как правило, одна на каждое приложение.

Сделайте небольшой рефакторинг и попробуйте с ним.

class ProxyHttpClient() {
  private implicit val system: ActorSystem = ActorSystem()
  private implicit val materializer: ActorMaterializer = ActorMaterializer()

  def get(url: String, proxy: ProxySettings,urlDownloaderConfig: 
    UrlDownloaderConfig)(implicit ec: ExecutionContext): Either[HttpError, 
    HttpSuccessResponse] = {???}
}

Или извлеките систему акторов и передайте ее в качестве неявного аргумента

class ProxyHttpClient() {

  def get(url: String, proxy: ProxySettings,urlDownloaderConfig: 
    UrlDownloaderConfig)(implicit ec: ExecutionContext, system: ActorSystem, materializer: ActorMaterializer): Either[HttpError, 
    HttpSuccessResponse] = {???}
}
...