Я новичок в скале и искре.Мне нужны некоторые предложения, поскольку я борюсь со множеством ошибок.
Структура кода:
def main () = {
implicit val spark: SparkSession = SparkSession.builder().appName("test").getOrCreate()
spark.sparkContext.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
spark.sqlContext.clearCache()
Utility.init()
}
})
// the rest code below initiates the job as a spark stream
}
Я хочу, чтобы метод Utility.init () выполнялся в задании потоковой передачи искры в драйвере /master, а не в execute, каждый раз перед началом задания, и он должен продолжать работать, как остальные потоковые задачи.
приведенный выше формат кода вызывает ошибку, что каждый раз при запуске потокового задания Utility.init ()метод вызывается как асинхронный, и до того, как его выходные данные сгенерированы, его обработка параллельно продолжается до следующего шага, приводящего к сбою.
Как убедиться, что этот конкретный метод запускается внутри драйвера в задании искрового потока?
Любая помощь / предложения полезны!Спасибо:)