Spark: несколько вызовов API с использованием mappartitions, в результате чего java.lang.illegalstateexception: пул соединений отключен - PullRequest
0 голосов
/ 06 мая 2019

Цель : получить ответ JSON из API поиска для доступных ключевых слов в столбце данных.

+---------+--------+--------------------+------+
|searchKeyword   |Response                        |
+---------+--------+--------------------+------+
|  bags          |    [{"id":"4664"}.....      |
| sheet          |    [{"id":"976"}.....      |
| bottles        |    [{"id":"1234"}.....      |
| disposable bags|    [{"id":"234"}.....      |
+---------+--------+--------------------+------+

Я выбрал список из нескольких ключевых слов, а затем перевернулСписок тех, кто в датафрейме.После этого я выполняю вызовы API для этих ключевых слов, выполняя сопоставления, так что для каждого раздела будет создано только одно http-соединение.

Однако, когда я выполняю действие для rdd, оно дает мне «Выключение пула соединенийошибка. "

Ниже приведен код использования отображений: -

val solrUrl = "http://%s:XXXXX/solr/%s/select?q=%s&fl=id,score&defType=edismax&wt=json"

def getHttpClient(): CloseableHttpClient = {
    val httpClient: CloseableHttpClient = HttpClients.createDefault();
    httpClient
  }


def getResults(url:String, httpClient:org.apache.http.impl.client.CloseableHttpClient): String = {
    val httpResponse = httpClient.execute(new HttpGet(url))
    val entity = httpResponse.getEntity()
    println(entity)
    var content = ""
    if (entity != null) {
      val inputStream = entity.getContent()
      content = scala.io.Source.fromInputStream(inputStream).getLines.mkString
      inputStream.close
    }
    httpClient.getConnectionManager().shutdown()
    return content
  }




val rddResults = searchTermsDf.rdd.mapPartitions(partition => {
  val connection = getHttpClient() 
  val newPartition = partition.map(keyword => {


  val searchTerm = keyword.getString(0)

  var url = solrUrl.format(HOST_IP,searchTerm)

  getResults(url,connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  //println(newPartition)
  connection.close()
  newPartition.iterator // create a new iterator
})

rddResults.foreach(println)

Не могли бы вы мне помочь, если я что-то не так делаю.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...