Проблема с трансляцией в контрольно-пропускном пункте - PullRequest
0 голосов
/ 05 июля 2019

Я должен использовать загрузку файла один раз в день и использовать это в моей потоковой передаче. Для этого я пытаюсь прочитать файл и транслировать его. Ниже приведен код, который я использую.

    def loadCustomer(sc: SparkContext, customerFilePath: String) = {
      val customerList: Set[String] = if (customerFilePath.isEmpty) Set()
      else {
        sc.textFile(customerFilePath).collect().toSet
      }
      customerList
    }
    ...
    ...

    var customerList = loadCustomer(spark.sparkContext, params.customerFilePath)

    // Filter by customer regular expression and customerList
    val filteredTransactionStream = tranactionStream
                            .filter(x => IDRegex.pattern.matcher(x.customer).matches()).filter{ case(transactionRecord) => !(customerList.contains(transactionRecord.customer))}

Код работает нормально, пока потоковая работа не выполняется непрерывно. Но у меня появляется ошибка ниже, когда я пытаюсь перезапустить работу. Я обнаружил, что мы не можем использовать трансляцию, если у нас есть контрольные точки.

java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to scala.collection.SetLike

Не могли бы вы дать мне знать, как решить эту проблему.

Спасибо

1 Ответ

0 голосов
/ 09 июля 2019

Вы должны остановить приложение изящно, иначе мудрое Приложение будет остановлено с половиной сохраненных данных, и при попытке перезапустить его может не сериализоваться, потому что данные не доступны полностью.

...