Spark & ​​Scala для трансляции в Twitter - PullRequest
0 голосов
/ 29 сентября 2018

Я пытаюсь транслировать живые твиты с помощью Spark / Scala.У меня возникли некоторые трудности.

Я использую Spark 2.0, scala 2.11.8, spark-streaming_2.11-2.0.0.jar & spark-streaming-twitter_2.11-2.0.0.jar.

Он запускается в первый раз и сразу выдает ошибку.ssc.awaitTermination() виновник.

Прикрепление фрагмента кода, а также ошибка, есть идеи, что я делаю не так?

import org.apache.log4j._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
import twitter4j.TwitterFactory
import twitter4j.conf.ConfigurationBuilder
import java.util.Properties
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.OAuthAuthorization

object TStreaming {
Logger.getLogger("org").setLevel(Level.ERROR)

def main (args: Array[String]) {

    val ssc = new StreamingContext("local[2]", "TweeterStreaming", Seconds(10))
    val hashTags = "Hurricane Florence"
    val cb = new ConfigurationBuilder()    
    val prop = new Properties()

    //prop.load(Thread.currentThread().getContextClassLoader.getResourceAsStream("twitter.properties"))

    cb.setDebugEnabled(true)
    .setOAuthConsumerKey("***************")
    .setOAuthConsumerSecret("***************")
    .setOAuthAccessToken("***************")
    .setOAuthAccessTokenSecret("***************")

    val bld = cb.build()
    val tf = new TwitterFactory(bld)
    val twitter = tf.getInstance()
    val filters = Array(hashTags).toSeq
    val auth = new OAuthAuthorization(bld)
    val twitterStream = TwitterUtils.createStream(ssc, Some(auth), filters, StorageLevel.MEMORY_ONLY)

    twitterStream.cache()

    val lines = twitterStream.map(status => status.getText)
    lines.print()
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()

    ssc.start()             // Start the computation
    ssc.awaitTermination()
  }

}

Вот ошибка ...

18/09/29 10:27:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
    at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1974)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1974)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
18/09/29 10:27:10 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
    at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1974)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1974)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
18/09/29 10:27:10 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
18/09/29 10:27:10 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
-------------------------------------------
Time: 1538242030000 ms
-------------------------------------------

-------------------------------------------
Time: 1538242030000 ms
-------------------------------------------

Вот фрагмент кода

Вот ошибка

Заранее спасибо.

1 Ответ

0 голосов
/ 29 сентября 2018

Вероятно, это ваша конфигурация инструмента сборки.Возможно, вы не правильно создаете свой uberjar, и класс не найден.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...