Цель : получить ответ JSON из API поиска для доступных ключевых слов в столбце данных.
+---------+--------+--------------------+------+
|searchKeyword |Response |
+---------+--------+--------------------+------+
| bags | [{"id":"4664"}..... |
| sheet | [{"id":"976"}..... |
| bottles | [{"id":"1234"}..... |
| disposable bags| [{"id":"234"}..... |
+---------+--------+--------------------+------+
Я выбрал список из нескольких ключевых слов, а затем перевернулСписок тех, кто в датафрейме.После этого я выполняю вызовы API для этих ключевых слов, выполняя сопоставления, так что для каждого раздела будет создано только одно http-соединение.
Однако, когда я выполняю действие для rdd, оно дает мне «Выключение пула соединенийошибка. "
Ниже приведен код использования отображений: -
val solrUrl = "http://%s:XXXXX/solr/%s/select?q=%s&fl=id,score&defType=edismax&wt=json"
def getHttpClient(): CloseableHttpClient = {
val httpClient: CloseableHttpClient = HttpClients.createDefault();
httpClient
}
def getResults(url:String, httpClient:org.apache.http.impl.client.CloseableHttpClient): String = {
val httpResponse = httpClient.execute(new HttpGet(url))
val entity = httpResponse.getEntity()
println(entity)
var content = ""
if (entity != null) {
val inputStream = entity.getContent()
content = scala.io.Source.fromInputStream(inputStream).getLines.mkString
inputStream.close
}
httpClient.getConnectionManager().shutdown()
return content
}
val rddResults = searchTermsDf.rdd.mapPartitions(partition => {
val connection = getHttpClient()
val newPartition = partition.map(keyword => {
val searchTerm = keyword.getString(0)
var url = solrUrl.format(HOST_IP,searchTerm)
getResults(url,connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
//println(newPartition)
connection.close()
newPartition.iterator // create a new iterator
})
rddResults.foreach(println)
Не могли бы вы мне помочь, если я что-то не так делаю.