Я интегрирую кафку и искру с упругим поиском.
когда я запускаю этот скрипт, используя:
sudo spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.1 --jars jars/elasticsearch-hadoop-2.1.0.Beta2.jar cv.py localhost:9092 flumekafka
Скрипт 'cv.py':
import json
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
#import org.elasticsearch.spark.rdd.EsSpark
if __name__ == "__main__":
sc = SparkContext(appName="kafka")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic],
{"metadata.broker.list": brokers})
es_write_conf = {"es.nodes" : 'localhost',"es.port" :
'9200',"es.resource" : 'rh/cv',"es.input.json" : "yes"}
parsed = kvs.map(lambda v: json.loads(v[1]))
# rdd = sc.parallelize(kvs)
#def format_data(x):
# return (data['doc_id'], json.dumps(data))
#rdd = rdd.map(lambda x: format_data(x))
parsed.saveAsNewAPIHadoopFile(path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
#parsed.pprint()
ssc.start()
ssc.awaitTermination()
У меня есть эта проблема:
parsed.saveAsNewAPIHadoopFile(path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_write_conf)
AttributeError: 'KafkaTransformedDStream' object has no attribute 'saveAsNewAPIHadoopFile'
что я могу сделать, чтобы мой скрипт вставлялся вasticsearc или есть другое решение.