dstream анализирует JSON и сохраняет в textFile: SparkStreaming - PullRequest
0 голосов
/ 02 мая 2018

У меня есть тема Kakfa, в которой данные хранятся в формате JSON. Я написал код потокового воспроизведения и хочу сохранить только значения из темы Kafka в файл в HDFS.

Вот так выглядят данные в моей теме кафки:

{"group_city":"\"Washington\"","group_country":"\"us\"","event_name":"\"Outdoor Afro Goes Ziplining\""}

Ниже приведен код, который я написал. Когда я его печатаю, я получаю проанализированный JSON, но моя проблема возникает, когда я пытаюсь сохранить только значения в текстовый файл.

val dstream = KafkaUtils.createDirectStream[String, String](ssc,preferredHosts,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

//___PRINTING RECORDS________
val output= dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val values = record.value()
    val tweet = scala.util.parsing.json.JSON.parseFull(values)
    val map:Map[String,String] = tweet.get.asInstanceOf[Map[String, String]]
    map.foreach(p => println(p._2))
  }
}

1 Ответ

0 голосов
/ 02 мая 2018

Вы можете сохранить rdd с помощью saveAsTextFile, но, поскольку вы хотите сохранить только значения, вы можете преобразовать их в датафрейм и записать как csv

dstream.foreachRDD(rawRDD => {

  // get the data 
  val rdd = rawRDD.map(_._2)

  rdd.saveAsTextFile("file path")

  //      or read the json String to dataframe and write as a csv

  spark.read.json(rdd).write.mode(SaveMode.Append).csv("path for output")
})

Надеюсь, это поможет!

...