Я пытаюсь сохранять DataSet в индекс ElasticSearch каждый день (расписание с Oozie), но у меня иногда возникает эта ошибка java.lang.NoClassDefFoundError: Не удалось инициализировать класс org.apache.spark.util.JsonProtocol поэтому работа сразу же провалилась. Я не знаю, почему появляется эта ошибка.
Код:
private def readSource1()(implicit spark: SparkSession): DataFrame = {
import spark.implicits._
val sourceName = "dictionary.source1"
val plantsPath: String = config.getString("sources." + sourceName + ".path")
spark.read
.option("delimiter", ";")
.option("header", "true")
.csv(plantsPath)
.select('id as "sourceId", 'assembly_site_id)
}
private def readSource2()(implicit spark: SparkSession): DataFrame = {
import spark.implicits._
val source2: SourceIO = SourceManager(config)("source2")
(startDate, endDate) match {
case (Some(sd), Some(ed)) ⇒ source2.loadDf()
.where('assemblyEndDate.between(Date.valueOf(sd), Date.valueOf(ed)) ||
'tctDate.between(Date.valueOf(sd), Date.valueOf(ed)))
case _ ⇒ source2.loadDf()
}
}
def saveSourceToEs(implicit sparkSession: SparkSession): Unit = {
val source1: DataFrame = readSource1()
val source2: DataFrame = readSource2()
val source: Dataset[Source] = buildSource(this.getSource(source1, source2))
source.saveToEs(s"source_${createDateString()}/_doc")
}
object SourceIndexer extends SparkApp with Configurable with Logging {
val config: Config = ConfigFactory.load()
def apply(
sourceID: Option[String] = None,
startDate: Option[LocalDate] = None,
endDate: Option[LocalDate] = None
): SourceIndexer = {
new SourceIndexer(config, sourceID, startDate, endDate)
}
def main(args: Array[String]): Unit = {
try {
val bootConfig = BootConfig.parseSourceIndexer(args)
this.apply(bootConfig.sourceID, bootConfig.startDate, bootConfig.endDate)
.saveSourceToEs(spark)
} finally {
spark.sparkContext.stop()
}
}
}
Спасибо за вашу помощь.