scala .MatchError Сообщение всякий раз, когда я запускаю Scala Object - PullRequest
0 голосов
/ 07 мая 2020

Следующий фрагмент кода является частью приложения Twitter Streaming, которое я использую с Spark Streaming:

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)

// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generate OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

Каждый раз, когда я go запускаю программу, я получаю следующую ошибку:

Exception in thread "main" scala.MatchError: [Ljava.lang.String;@323659f8 (of class [Ljava.lang.String;)
at SparkPopularHashTags$.main(SparkPopularHashTags.scala:18)
at SparkPopularHashTags.main(SparkPopularHashTags.scala)

Строка 18:

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)

У меня есть файл Twitter4j.properties, сохраненный в моей папке F: \ Software \ ItelliJ \ Projects \ twitterStreamApp \ sr c, и он отформатирован так:

oauth.consumerKey=***
oauth.consumerSecret=***
oauth.accessToken=***
oauth.accessTokenSecret=***

Где «*» - мои ключи без кавычек (например, oauth.consumerKey = h12b31289fh7139fbh138ry)

Кто-нибудь может мне помочь с этим, пожалуйста?

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

object SparkPopularHashTags {
  val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
  val sc = new SparkContext(conf)

  def main(args: Array[String]) {

    sc.setLogLevel("WARN")

    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
//    val filters = args.takeRight(args.length - 4)


    args.lift(0).foreach { consumerKey =>
      System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    }
    args.lift(1).foreach { consumerSecret =>
      System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    }
    args.lift(2).foreach { accessToken =>
      System.setProperty("twitter4j.oauth.accessToken", accessToken)
    }
    args.lift(3).foreach { accessTokenSecret =>
      System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    }

    val filters = args.drop(4)
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generate OAuth credentials
//    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
//    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
//    System.setProperty("twitter4j.oauth.accessToken", accessToken)
//    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

    // Set the Spark StreamingContext to create a DStream for every 5 seconds
    val ssc = new StreamingContext(sc, Seconds(5))

    val stream = TwitterUtils.createStream(ssc, None, filters)

    // Split the stream on space and extract hashtags
    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

    // Get the top hashtags over the previous 60 sec window
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

    // Get the top hashtags over the previous 10 sec window
    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

    // print tweets in the correct DStream
    stream.print()

    // Print popular hashtags
    topCounts60.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
      topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })
    topCounts10.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
      topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

Ответы [ 2 ]

1 голос
/ 07 мая 2020

Это проблема:

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)

Это не сработает, если аргументов меньше 4, потому что он не может соответствовать четырем значениям в левой части.

Вместо этого, вам необходимо протестировать элементы args индивидуально, чтобы убедиться, что они присутствуют. Например

args.lift(0).foreach { consumerKey =>
  System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
}
args.lift(1).foreach { consumerSecret =>
  System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
}
args.lift(2).foreach { accessToken =>
  System.setProperty("twitter4j.oauth.accessToken", accessToken)
}
args.lift(3).foreach { accessTokenSecret =>
  System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
}

val filters = args.drop(4)
0 голосов
/ 07 мая 2020

Это должно происходить только в том случае, если вы не устанавливаете аргументы вашей Программы или устанавливаете недостаточное число. аргументов т.е. меньше 4

...