Не могу вставить в eslaticsearch, используя искру и кафку с питоном - PullRequest
0 голосов
/ 16 мая 2019

Я интегрирую кафку и искру с упругим поиском.

когда я запускаю этот скрипт, используя:

sudo spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.1 --jars jars/elasticsearch-hadoop-2.1.0.Beta2.jar  cv.py localhost:9092 flumekafka

Скрипт 'cv.py':

import json
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils
#import org.elasticsearch.spark.rdd.EsSpark
if __name__ == "__main__":
    sc = SparkContext(appName="kafka")
    ssc = StreamingContext(sc, 2)
    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], 
       {"metadata.broker.list": brokers})
    es_write_conf = {"es.nodes" : 'localhost',"es.port" : 
    '9200',"es.resource" : 'rh/cv',"es.input.json" : "yes"}

    parsed = kvs.map(lambda v: json.loads(v[1]))
   # rdd = sc.parallelize(kvs)
    #def format_data(x):
      #  return (data['doc_id'], json.dumps(data))

    #rdd = rdd.map(lambda x: format_data(x))
    parsed.saveAsNewAPIHadoopFile(path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_write_conf)
    #parsed.pprint()

    ssc.start()
    ssc.awaitTermination()

У меня есть эта проблема:

parsed.saveAsNewAPIHadoopFile(path='-',  outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
keyClass="org.apache.hadoop.io.NullWritable",   valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",   conf=es_write_conf)
AttributeError: 'KafkaTransformedDStream' object has no attribute 'saveAsNewAPIHadoopFile'

что я могу сделать, чтобы мой скрипт вставлялся вasticsearc или есть другое решение.

...