Не удается отправить данные вasticsearch - PullRequest
0 голосов
/ 01 февраля 2019

Когда я пытаюсь отправить файл json в эластичный поиск с помощью pyspark, он выдает ошибку.

Вот мой код:

IN: x.take(1)
OUT: [(4673626,
'{"_c0": 0, "ID": 4673626, "Case Number": "HM274058", "Date": "04/02/2006 
01:00:00 PM", "Block": "055XX N MANGO AVE", "IUCR": "2825", "Primary Type": 
"OTHER OFFENSE", "Description": "HARASSMENT BY TELEPHONE", "Location 
Description": "RESIDENCE", "Arrest": false, "Domestic": false, "Beat": 1622, 
"District": 16.0, "Ward": 45.0, "Community Area": 11.0, "FBI Code": "26", "X 
Coordinate": 1136872.0, "Y Coordinate": 1936499.0, "Year": 2006, "Updated 
On": "04/15/2016 08:55:02 AM", "Latitude": 41.981912692, "Longitude": 
"-87.771996382", "Location": "(41.981912692, -87.771996382)", "Date_time": 
"2006-04-02T13:00:00.000+03:00", "distance": 4.658237e-05}')]

#the configuration of elastic search (node, port, resource...)
IN: conf={
"es.node":"localhost", #ip of elastic search
"es.port":"9200", #port
"es.resource":"test", #index/type
"es.input.json":"yes",# read as json
"es.mapping.id":"ID",
}

IN: #the configuration of elastic search (node, port, resource...)
x.saveAsNewAPIHadoopFile(
path='_',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", # get it 
from pyspark jar file
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf
)

, и он дает мне эту ошибку:

    Py4JJavaError                             Traceback (most recent call last)
    <ipython-input-90-65a90694466f> in <module>
      4     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", # 
     get it from pyspark jar file
      5     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
       ----> 6     conf=conf
      7 )

     /opt/spark/python/pyspark/rdd.py in saveAsNewAPIHadoopFile(self, path,                          
    outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf)
    1451                                                        
    outputFormatClass,
    1452                                                        keyClass,      
    valueClass,
    -> 1453                                                        keyConverter, 
    valueConverter, jconf)
    1454 
    1455     def saveAsHadoopDataset(self, conf, keyConverter=None, 
    valueConverter=None):

    /opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in 
    __call__(self, *args)
    1255         answer = self.gateway_client.send_command(command)
    1256         return_value = get_return_value(
    -> 1257             answer, self.gateway_client, self.target_id, self.name)
    1258 
    1259         for temp_arg in temp_args:

    /opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
    ---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

    /opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in 
    get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    --> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

    Py4JJavaError: An error occurred while calling 
    z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
    : org.apache.spark.SparkException: Job aborted.
    at 
org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:96)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:991)
    at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:549)
    at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 434.0 failed 1 times, most recent failure: Lost task 3.0 in stage 434.0 (TID 32977, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[127.0.0.1:9200]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:152)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:398)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:382)
    at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:219)
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.tryFlush(BulkProcessor.java:196)
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:499)
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.add(BulkProcessor.java:127)
    at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:192)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:172)
    at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.write(EsOutputFormat.java:151)
    at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:352)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:126)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1414)
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
    ... 8 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
    ... 27 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[127.0.0.1:9200]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:152)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:398)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:382)
    at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:219)
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.tryFlush(BulkProcessor.java:196)
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:499)
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.add(BulkProcessor.java:127)
    at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:192)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:172)
    at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.write(EsOutputFormat.java:151)
    at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:352)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:126)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1414)
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
    ... 8 more
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...