SparkStreaming / Scala / Twitter java .lang.NoSuchMethodError: twitter4j.TwitterStream.addListener (Ltwitter4j / StreamListener;) V - PullRequest
0 голосов
/ 06 февраля 2020

Я пытаюсь получить твиты с IntelliJ и SBT , я использую Spark 2.4.4, Scala 2.11.12, jdk1.8.0_231 . Соединение локальное , я не использую кластер.

Однако я получаю эту ошибку и не могу ее исправить, я изменил зависимости несколькими способами, но я не не знаю, что я делаю неправильно ... у кого-нибудь есть идеи, которые могут помочь мне найти решение?

package com.sparkStreaming.twitter

import org.apache.spark
import org.apache.spark.rdd._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructField}
import org.apache.spark.streaming
import org.apache.spark.streaming.twitter.TwitterUtils

import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import org.apache.spark.streaming._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Tweets {

  def main(args: Array[String]) {

      //Configure Twitter credentials
      val consumerKey = "******"
      val consumerSecret = "******"
      val accessToken = "******"
      val accessTokenSecret = "******"

      System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
      System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
      System.setProperty("twitter4j.oauth.accessToken", accessToken)
      System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

      val conf = new SparkConf().setAppName("Tweets")
      conf.setMaster("local[2]")

      //Window and stream parameter to process
      var timeSec = 10

      //Filter tweets
      var filterTag = Array("@realmadrid,$SAN") //@businessinsider,

      //Create streaming context
      val ssc = new StreamingContext(conf, Seconds(timeSec)) //creo el streaming context

      val tweets = TwitterUtils.createStream(ssc, None, filterTag) // creo el TwitterUtils

      val rowRDD = tweets.map(rddTweet => rddTweet.getText)
      rowRDD.print()

      ssc.start()
      ssc.awaitTermination()
  }
}

Это SBT

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.4"

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"

// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4" //% "provided"

// https://mvnrepository.com/artifact/org.twitter4j/twitter4j-core
libraryDependencies += "org.twitter4j" % "twitter4j-core" % "4.0.7"

// https://mvnrepository.com/artifact/org.twitter4j/twitter4j-stream
libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "4.0.7"

// https://mvnrepository.com/artifact/org.apache.bahir/spark-streaming-twitter
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.1.0"

Это ошибка:

20/02/06 17:52:38 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/02/06 17:52:38 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError: 
twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

20/02/06 17:52:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
20/02/06 17:52:38 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/02/06 17:52:38 INFO TaskSchedulerImpl: Cancelling stage 0
20/02/06 17:52:38 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
20/02/06 17:52:38 INFO DAGScheduler: ResultStage 0 (start at Tweets.scala:80) failed in 0,248 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
20/02/06 17:52:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

Ответы [ 2 ]

1 голос
/ 07 февраля 2020

Артефакт spark-streaming-twitter 2.1.0 зависит от другой версии twitter4j, о чем свидетельствует pom. xml, которую вы можете найти на сайте Maven.

<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.4</version>
</dependency>

Возможно, он не имеет обратной совместимости, поэтому перемещение его на более новую версию не будет работать (SBT обычно использует самую высокую версию, которую использует любая зависимость, если вы не укажете ее). Удалите свои спецификации twitter4j и попробуйте снова.

0 голосов
/ 14 февраля 2020

Я получил его с зависимостями:

{

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>1.5.2</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>1.5.2</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.4.4</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-twitter_2.11</artifactId>
  <version>1.6.2</version>
</dependency>

}

Эти сообщения помогли мне:

Spark Twitter Streaming исключение: (org. apache .spark.Logging) classnotfound

Зависимости для потоковой передачи Spark и Twiter-Streaming в SBT

...