Я пытаюсь создать искровое приложение, которое будет извлекать данные Twitter из Kafka, а затем выполнить обработку и, наконец, сохранить весь вывод. Ниже моя искра scala код
package sparkWCExample.spWCExample
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf,SparkContext }
import org.apache.spark.sql.{SQLContext,SparkSession}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.twitter.TwitterUtils
import kafka.serializer.Decoder
import kafka.serializer.StringDecoder
import com.stanford_nlp.SentimentAnalyzer.SentimentAnalyzer
import sparkWCExample.spWCExample.ExternalClass.SentimentAnalyzerDetails
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import scala.collection.Seq
object twitterSentiment {
def main(args: Array[String]) {
/* if (args.length < 4) {
System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " + "<access token> <access token secret> [<filters>]")
System.exit(1)
}*/
//val now = Calendar.getInstance().getTime()
val conf = new SparkConf().setAppName("twitter-stream-sentiment").setMaster("local[2]")
val spark = SparkSession.builder().config(conf).appName("CsvExample").master("local").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
val sentiment_analyzer = new SentimentAnalyzerDetails
val ssc = new StreamingContext(sc, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("bigdata-tweets")
val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
)
val streamdata = stream.map(record => println("record data "+record.key+ " " +record.value))
stream.foreachRDD { rdd =>
rdd.foreach{x=> println("record -->" +x.value())
}
}
ssc.start()
ssc.awaitTermination()
}
теперь, когда я я делаю rdd.foreach
и печатаю значение x.value, я получаю следующие данные
Tweet {id = 1242897366950297602, text = 'RT @MattRiMo: #COVID ー 19 320 000 фактических данных à l 'Assurance malad ie depuis le 15 septembre 2018, dont 115 000 depuis le 1er m…', lang = 'fr', пользователь = пользователь {id = 4241334975, name = 'FEIMA', screenName = 'FeimaEditeurs', location = 'null', followCount = 1010}, retweetCount = 0, favouriteCount = 0} Tweet {id = 1242897367231315975, text = 'RT @MarceVann: Il 22 февраля. … ', Lang =' it ', пользователь = пользователь {id = 1233696142032015360, name =' Луиджи Салерно ', screenName =' LuigiSa45451613 ', location =' null ', followCount = 84}, retweetCount = 0, favouriteCount = 0} * 1 007 *
После этого меня смущает, как я могу преобразовать данные в фрейм данных или набор данных или в отображение в класс, чтобы я мог извлечь текст из данных Tweet и что-то с ним сделать. Пожалуйста, помогите