Невозможно выполнить запрос mtermvectors elasticsearch из кластера AWS EMR с помощью Spark - PullRequest
1 голос
/ 12 июля 2020

Я пытаюсь выполнить этот запрос elasticsearch через искру:

    POST /aa6/_mtermvectors  
    {
      "ids": [
        "ABC",
        "XYA",
        "RTE"
      ],
      "parameters": {
        "fields": [
          "attribute"
        ],
        "term_statistics": true,
        "offsets": false,
        "payloads": false,
        "positions": false
      }
    }

Код, который я написал в Zeppelin:

def createString():String = {
    return s"""_mtermvectors {
  "ids": [
    "ABC",
    "XYA",
    "RTE"
  ],
  "parameters": {
    "fields": [
      "attribute"
    ],
    "term_statistics": true,
    "offsets": false,
    "payloads": false,
    "positions": false
    }
  }"""
}

import org.elasticsearch.spark._
sc.esRDD("aa6", "?q="+createString).count   

Я получаю сообщение об ошибке:

org.elasticsearch.had oop .rest.EsHadoopInvalidRequest: org.elasticsearch.had oop .rest.EsHadoopRemoteException: parse_exception: parse_exception: Encountered " «[« RTE »,« XYA »,« AB C »« »в строке 1, столбце 22. Ожидал:« TO »...

{"query":{"query_string":{"query":"_mtermvectors {\"ids\": [\"RTE\",\"ABC\",\"XYA\"], \"parameters\": {\"fields\": [\"attribute\"], \"term_statistics\": true, \"offsets\": false, \"payloads\": false, \"positions\": false } }"}}}
    at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:477)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:428)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:408)

Это, наверное, что-то простое, но я Я не могу найти способ установить тело запроса при вызове искры

1 Ответ

0 голосов
/ 12 июля 2020

Я не уверен, но не думаю, что в настоящее время это поддерживается пакетом es-Spark. Вы можете проверить this link, чтобы узнать, какие варианты доступны через sparkContext из esRDD.

Вместо этого вы можете использовать High Level Rest Client Elasticsearch и получите подробности в List, Seq или любом файле, а затем загрузите это в Spark RDD.

Это кругосветный способ, но, к сожалению, я полагаю, что это единственный способ. Чтобы помочь, я создал приведенный ниже фрагмент, чтобы у вас, по крайней мере, были необходимые данные из Elasticsearch, связанные с указанным выше запросом.

import org.apache.http.HttpHost
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.core.MultiTermVectorsRequest
import org.elasticsearch.client.core.TermVectorsRequest
import org.elasticsearch.client.core.TermVectorsResponse

object SampleSparkES {
  
  /**
   * Main Class where program starts
   */
  def main(args: Array[String]) = {

     val termVectorsResponse = elasticRestClient
  
     println(termVectorsResponse.size)
  
   }
  
  /**
   * Scala client code to retrieve the response of mtermVectors 
   */
  def elasticRestClient : java.util.List[TermVectorsResponse] = {
    
    val client = new RestHighLevelClient(
                        RestClient.builder(
                            new HttpHost("localhost", 9200, "http")))    
    
    val tvRequestTemplate = new TermVectorsRequest("aa6","ids"); 
    tvRequestTemplate.setFields("attribute");
    
    //Set the document ids you want for collecting the term Vector information
    val ids = Array("1", "2", "3");
    val request = new MultiTermVectorsRequest(ids, tvRequestTemplate); 
    val response = client.mtermvectors(request, RequestOptions.DEFAULT)
    
    //get the response
    val termVectorsResponse = response.getTermVectorsResponses   
    
    //close RestHighLevelClient
    client.close();    
    
    //return  List[TermVectorsResponse]
    termVectorsResponse
  }
}

В качестве примера вы можете получить sumDocFreq первого документа ниже

println(termVectorsResponse.iterator.next.getTermVectorsList.iterator.next.getFieldStatistics.getSumDocFreq)

Все, что вам теперь нужно, это найти способ преобразовать коллекцию в Seq таким образом, чтобы его можно было загрузить в RDD.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...