Я пытаюсь транслировать живые твиты с помощью Spark / Scala.У меня возникли некоторые трудности.
Я использую Spark 2.0
, scala 2.11.8
, spark-streaming_2.11-2.0.0.jar
& spark-streaming-twitter_2.11-2.0.0.jar
.
Он запускается в первый раз и сразу выдает ошибку.ssc.awaitTermination()
виновник.
Прикрепление фрагмента кода, а также ошибка, есть идеи, что я делаю не так?
import org.apache.log4j._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
import twitter4j.TwitterFactory
import twitter4j.conf.ConfigurationBuilder
import java.util.Properties
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.OAuthAuthorization
object TStreaming {
Logger.getLogger("org").setLevel(Level.ERROR)
def main (args: Array[String]) {
val ssc = new StreamingContext("local[2]", "TweeterStreaming", Seconds(10))
val hashTags = "Hurricane Florence"
val cb = new ConfigurationBuilder()
val prop = new Properties()
//prop.load(Thread.currentThread().getContextClassLoader.getResourceAsStream("twitter.properties"))
cb.setDebugEnabled(true)
.setOAuthConsumerKey("***************")
.setOAuthConsumerSecret("***************")
.setOAuthAccessToken("***************")
.setOAuthAccessTokenSecret("***************")
val bld = cb.build()
val tf = new TwitterFactory(bld)
val twitter = tf.getInstance()
val filters = Array(hashTags).toSeq
val auth = new OAuthAuthorization(bld)
val twitterStream = TwitterUtils.createStream(ssc, Some(auth), filters, StorageLevel.MEMORY_ONLY)
twitterStream.cache()
val lines = twitterStream.map(status => status.getText)
lines.print()
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination()
}
}
Вот ошибка ...
18/09/29 10:27:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1974)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1974)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
18/09/29 10:27:10 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1974)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1974)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
18/09/29 10:27:10 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
18/09/29 10:27:10 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
-------------------------------------------
Time: 1538242030000 ms
-------------------------------------------
-------------------------------------------
Time: 1538242030000 ms
-------------------------------------------
Вот фрагмент кода
Вот ошибка
Заранее спасибо.