Итак, я пытаюсь обработать данные в Spark, поступающие из потока kafka, а затем отправить их в упругий поиск, чтобы я мог их визуализировать в Kibana.Однако, когда я вижу данные в Kibana, они отображаются как поврежденная запись, а не как ее собственное поле.
Здесь приведен код для обработки и отправки данных вasticsearch.Данные, поступающие в kafka, - это просто текстовые данные из твиттера, и я применяю к ним пару функций.
def process(time, rdd):
print("========= %s =========" % str(time))
try:
sqlContext = getSqlContextInstance(rdd.context)
df = sqlContext.read.json(rdd)
results = df.toJSON().map(lambda j: json.loads(j)).collect()
send_elastic(results,"index1","document")
except:
pass
def main():
createIndex("index1")
sc = SparkContext(appName="PythonStreaming", master="local[2]")
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 10)
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'twitter':1})
tweets = kafkaStream.map(lambda x: json.loads(x[1])).map(lambda x: json.loads(x))
sentiments = tweets.map(lambda x: {'tweet': x['text'],'candidate': get_candidate(x['text']),'sentiment':sentiment(x['text'])})
sentiments.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
Это то, что данные отображаются, как в Kibana.Как вы можете видеть, она отображается как испорченная запись вместо моего намерения, которое должно состоять из поля текста, настроений и кандидатов.Я ценю любую помощь, которую я могу получить, спасибо.
вывод в кибане