Преобразование потока данных InputDStream [ConsumerRecord [String, String]] в объект Twitter - PullRequest
1 голос
/ 25 марта 2020

Я пытаюсь создать искровое приложение, которое будет извлекать данные Twitter из Kafka, а затем выполнить обработку и, наконец, сохранить весь вывод. Ниже моя искра scala код

package sparkWCExample.spWCExample

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf,SparkContext }
import org.apache.spark.sql.{SQLContext,SparkSession}


import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.twitter.TwitterUtils
import kafka.serializer.Decoder
import kafka.serializer.StringDecoder

import com.stanford_nlp.SentimentAnalyzer.SentimentAnalyzer
import sparkWCExample.spWCExample.ExternalClass.SentimentAnalyzerDetails

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import scala.collection.Seq

object twitterSentiment  {

  def main(args: Array[String]) {
 /* if (args.length < 4) {
  System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " + "<access token> <access token secret> [<filters>]")
  System.exit(1)
  }*/
  //val now = Calendar.getInstance().getTime()
        val conf = new SparkConf().setAppName("twitter-stream-sentiment").setMaster("local[2]")
    val spark = SparkSession.builder().config(conf).appName("CsvExample").master("local").getOrCreate()
    val sc: SparkContext = spark.sparkContext

 sc.setLogLevel("WARN") 
 val sentiment_analyzer = new SentimentAnalyzerDetails
 val ssc = new StreamingContext(sc, Seconds(5)) 

        val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("bigdata-tweets")
val stream = KafkaUtils.createDirectStream[String, String]( ssc,  PreferConsistent,  Subscribe[String, String](topics, kafkaParams)
)

val streamdata = stream.map(record => println("record data "+record.key+ " "  +record.value))
stream.foreachRDD { rdd =>
  rdd.foreach{x=> println("record -->"  +x.value())
    }
}

ssc.start()
ssc.awaitTermination()

   }

теперь, когда я я делаю rdd.foreach и печатаю значение x.value, я получаю следующие данные

Tweet {id = 1242897366950297602, text = 'RT @MattRiMo: #COVID ー 19 320 000 фактических данных à l 'Assurance malad ie depuis le 15 septembre 2018, dont 115 000 depuis le 1er m…', lang = 'fr', пользователь = пользователь {id = 4241334975, name = 'FEIMA', screenName = 'FeimaEditeurs', location = 'null', followCount = 1010}, retweetCount = 0, favouriteCount = 0} Tweet {id = 1242897367231315975, text = 'RT @MarceVann: Il 22 февраля. … ', Lang =' it ', пользователь = пользователь {id = 1233696142032015360, name =' Луиджи Салерно ', screenName =' LuigiSa45451613 ', location =' null ', followCount = 84}, retweetCount = 0, favouriteCount = 0} * 1 007 *

После этого меня смущает, как я могу преобразовать данные в фрейм данных или набор данных или в отображение в класс, чтобы я мог извлечь текст из данных Tweet и что-то с ним сделать. Пожалуйста, помогите

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...