org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile Ошибка - PullRequest
0 голосов
/ 13 ноября 2018

в новом проекте я хотел бы использовать Apache-Spark иasticSearch. Я нашел много учебников и нашел этот код, который я адаптировал:

import os
import json
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars elasticsearch-spark_2.11-2.4.5.jar pyspark-shell'

sc = SparkContext(appName="AllCountries")

es_write_conf = { "es.nodes": 'localhost', "es.port" : '9200', "es.resource" : 'testindex/testdoc', "es.batch.size.entries": 1, "es.input.json" : "yes", "es.mapping.id": "doc_id" }

data = [ {'some_key': 'some_value', 'doc_id': 123}, {'some_key': 'some_value', 'doc_id': 456}, {'some_key': 'some_value', 'doc_id': 789} ]
rdd = sc.parallelize(data)

def format_data(x):
    return (data[0]['doc_id'], json.dumps(data))

rdd = rdd.map(lambda x: format_data(x))

rdd.saveAsNewAPIHadoopFile( \
    path='-', \
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", \
    keyClass="org.apache.hadoop.io.NullWritable", \
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", \
    conf=es_write_conf)

Но у меня есть эта ошибка:

18/11/13 15:26:45 WARN Utils: Your hostname, bailleux-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
18/11/13 15:26:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
18/11/13 15:26:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Traceback (most recent call last):
File "/home/bailleux/dev/spark-2.4.0-bin-hadoop2.7/text.py", line 33, in <module>
    conf=es_write_conf)
File "/home/bailleux/dev/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in saveAsNewAPIHadoopFile
File "/home/bailleux/dev/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/bailleux/dev/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
    at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$mapToConf$1.apply(PythonHadoopUtil.scala:160)
    at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$mapToConf$1.apply(PythonHadoopUtil.scala:160)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.spark.api.python.PythonHadoopUtil$.mapToConf(PythonHadoopUtil.scala:160)
    at org.apache.spark.api.python.PythonRDD$.getMergedConf(PythonRDD.scala:445)
    at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:557)
    at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

Я не понимаю, где моя ошибка, может у кого-то есть решение?

Спасибо за вашу помощь.

...