У меня есть тема Kakfa, в которой данные хранятся в формате JSON. Я написал код потокового воспроизведения и хочу сохранить только значения из темы Kafka в файл в HDFS.
Вот так выглядят данные в моей теме кафки:
{"group_city":"\"Washington\"","group_country":"\"us\"","event_name":"\"Outdoor Afro Goes Ziplining\""}
Ниже приведен код, который я написал. Когда я его печатаю, я получаю проанализированный JSON, но моя проблема возникает, когда я пытаюсь сохранить только значения в текстовый файл.
val dstream = KafkaUtils.createDirectStream[String, String](ssc,preferredHosts,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
//___PRINTING RECORDS________
val output= dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val values = record.value()
val tweet = scala.util.parsing.json.JSON.parseFull(values)
val map:Map[String,String] = tweet.get.asInstanceOf[Map[String, String]]
map.foreach(p => println(p._2))
}
}