Как исправить pyspark для поиска поврежденных записей - PullRequest
0 голосов
/ 11 марта 2019

Итак, я пытаюсь обработать данные в Spark, поступающие из потока kafka, а затем отправить их в упругий поиск, чтобы я мог их визуализировать в Kibana.Однако, когда я вижу данные в Kibana, они отображаются как поврежденная запись, а не как ее собственное поле.

Здесь приведен код для обработки и отправки данных вasticsearch.Данные, поступающие в kafka, - это просто текстовые данные из твиттера, и я применяю к ним пару функций.

def process(time, rdd):
  print("========= %s =========" % str(time))
  try:
    sqlContext = getSqlContextInstance(rdd.context)
    df = sqlContext.read.json(rdd)
    results = df.toJSON().map(lambda j: json.loads(j)).collect()
    send_elastic(results,"index1","document")
  except:
    pass

def main():
  createIndex("index1")
  sc = SparkContext(appName="PythonStreaming", master="local[2]")
  sqlContext = SQLContext(sc)
  ssc = StreamingContext(sc, 10)
  kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'twitter':1})
  tweets = kafkaStream.map(lambda x: json.loads(x[1])).map(lambda x: json.loads(x))
  sentiments = tweets.map(lambda x: {'tweet': x['text'],'candidate': get_candidate(x['text']),'sentiment':sentiment(x['text'])})
  sentiments.foreachRDD(process)
  ssc.start()
  ssc.awaitTermination()

Это то, что данные отображаются, как в Kibana.Как вы можете видеть, она отображается как испорченная запись вместо моего намерения, которое должно состоять из поля текста, настроений и кандидатов.Я ценю любую помощь, которую я могу получить, спасибо.

вывод в кибане

1 Ответ

0 голосов
/ 12 марта 2019

Эта строка df = sqlContext.read.json(rdd) по умолчанию предполагает наличие одного объекта JSON в одной строке.

Проверьте эту ссылку как ссылка

В вашем файле json должен быть один документ на строку.Например, как показано ниже:

{ "tweet": "RT @humanidee: @john_arcadian @wikileaks @marcorubio Tweet between Bernie and Hilary", "candidate": "Bernie", "sentiment": "negative"}
{ "tweet": "RT @lissbrantley: Outside the Bernie rally in #Concord and @MSNBC is out here asking everyoneif they believe in capitalism and if not @Ber..", "candidate": "bernie", "sentiment": "neutral" }

Теперь, если вы хотите обрабатывать несколько строк, вам нужно добавить приведенный ниже код

df = spark.read.option("multiline", "true").json("multi.json")
mdf.show(false)

В качестве добавленной заметки убедитесь, что объект JSONв правильном формате.Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...