Когда я тестировал коды по отдельному параметру, он прекрасно работал.
Например: запустить эти:
val spark = SparkSession
.builder()
.appName("CommonWords")
.config("spark.master", "local")
.getOrCreate()
import spark.sqlContext.implicits._
val inputlist1 = sc.textFile(inputFile1).collect()
val words1 = inputlist1.flatMap(_.split(" "))
.map(word => word.replaceAll("[^a-zA-Z ]", "").toLowerCase().trim())
.filter(word => !stopwords.contains(word) && word.trim().length() != 0)
val newdf1 = spark.sparkContext.parallelize(words1).toDF("word")
val count1agg = newdf1.groupBy("word").count()
val inputlist2 = sc.textFile(inputFile2).collect()
val words2 = inputlist2.flatMap(_.split(" "))
.map(word => word.replaceAll("[^a-zA-Z ]", "").toLowerCase().trim())
.filter(word => !stopwords.contains(word) && word.trim().length() != 0)
val newdf2 = spark.sparkContext.parallelize(words2).toDF("word")
val count2agg = newdf2.groupBy("word").count()
Но когда я пытаюсь обернуть вышеупомянутое в функцию (как показано ниже), она больше не работает:
def wordCount(sc: SparkContext, spark: SparkSession,stopwords: Array[String], inputFile: String): DataFrame = {
val inputlist = sc.textFile(inputFile).collect()
val words = inputlist.flatMap(_.split(" "))
.map(word => word.replaceAll("[^a-zA-Z ]", "").toLowerCase().trim())
.filter(word => !stopwords.contains(word) && word.trim().length() != 0)
val newdf = spark.sparkContext.parallelize(words).toDF("word")
var newdf_agg = newdf.groupBy("word").count()
return newdf_agg
}
Основное сообщение об ошибке, которое я получаю: java .lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to [B