в новом проекте я хотел бы использовать Apache-Spark иasticSearch. Я нашел много учебников и нашел этот код, который я адаптировал:
import os
import json
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars elasticsearch-spark_2.11-2.4.5.jar pyspark-shell'
sc = SparkContext(appName="AllCountries")
es_write_conf = { "es.nodes": 'localhost', "es.port" : '9200', "es.resource" : 'testindex/testdoc', "es.batch.size.entries": 1, "es.input.json" : "yes", "es.mapping.id": "doc_id" }
data = [ {'some_key': 'some_value', 'doc_id': 123}, {'some_key': 'some_value', 'doc_id': 456}, {'some_key': 'some_value', 'doc_id': 789} ]
rdd = sc.parallelize(data)
def format_data(x):
return (data[0]['doc_id'], json.dumps(data))
rdd = rdd.map(lambda x: format_data(x))
rdd.saveAsNewAPIHadoopFile( \
path='-', \
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", \
keyClass="org.apache.hadoop.io.NullWritable", \
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", \
conf=es_write_conf)
Но у меня есть эта ошибка:
18/11/13 15:26:45 WARN Utils: Your hostname, bailleux-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
18/11/13 15:26:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
18/11/13 15:26:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
File "/home/bailleux/dev/spark-2.4.0-bin-hadoop2.7/text.py", line 33, in <module>
conf=es_write_conf)
File "/home/bailleux/dev/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in saveAsNewAPIHadoopFile
File "/home/bailleux/dev/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/bailleux/dev/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$mapToConf$1.apply(PythonHadoopUtil.scala:160)
at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$mapToConf$1.apply(PythonHadoopUtil.scala:160)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.api.python.PythonHadoopUtil$.mapToConf(PythonHadoopUtil.scala:160)
at org.apache.spark.api.python.PythonRDD$.getMergedConf(PythonRDD.scala:445)
at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:557)
at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Я не понимаю, где моя ошибка, может у кого-то есть решение?
Спасибо за вашу помощь.