Я должен использовать загрузку файла один раз в день и использовать это в моей потоковой передаче. Для этого я пытаюсь прочитать файл и транслировать его. Ниже приведен код, который я использую.
def loadCustomer(sc: SparkContext, customerFilePath: String) = {
val customerList: Set[String] = if (customerFilePath.isEmpty) Set()
else {
sc.textFile(customerFilePath).collect().toSet
}
customerList
}
...
...
var customerList = loadCustomer(spark.sparkContext, params.customerFilePath)
// Filter by customer regular expression and customerList
val filteredTransactionStream = tranactionStream
.filter(x => IDRegex.pattern.matcher(x.customer).matches()).filter{ case(transactionRecord) => !(customerList.contains(transactionRecord.customer))}
Код работает нормально, пока потоковая работа не выполняется непрерывно. Но у меня появляется ошибка ниже, когда я пытаюсь перезапустить работу. Я обнаружил, что мы не можем использовать трансляцию, если у нас есть контрольные точки.
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to scala.collection.SetLike
Не могли бы вы дать мне знать, как решить эту проблему.
Спасибо