Я запускаю параллельные задания Spark в одном приложении Spark, и примерно в 2% случаев я получаю сообщение об ошибке, подобное этому:
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: No Encoder found for scala.Option[String]
- field (class: "scala.Option", name: "my_field")
- root class: "my.package.Clazz"
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at my.package.Application
Это происходит из org.apache.spark.sql.catalyst.ScalaReflection
.
Согласно документации Spark, в приложении Spark "полностью поточно-ориентирован и поддерживает этот сценарий использования для включения приложений, которые обслуживают несколько запросов": https://spark.apache.org/docs/latest/job-scheduling.html
Для моего приложения необходим параллелизм.
Особый вариант использования включает считывание нескольких ключей секционирования из Cassandra в набор данных [T], с которым драйвер Spark Cassandra в настоящее время плохо справляется.Мой код выглядит так:
var continue = true
while (continue)
futures += executor.submit(
new Runnable {
override def run(): Unit = {
val factory = new ClassBasedRowReaderFactory[T]()
val reader = factory.rowReader(tableDef, caseClassColumns)
val sqlHour = new sql.Timestamp(calendar.getTime.getTime)
val rawRows = new ListBuffer[T]()
session.execute(preparedStatement.bind(sqlHour)).forEach(new Consumer[Row] {
override def accept(row: Row): Unit = {
val metadata = CassandraRowMetadata(tableColumns)
rawRows += reader.read(row, metadata)
}
})
val rowsDataset = sparkSession
.createDataset(rawRows)
.coalesce(1)
val destination = ...
rowsDataset.write
.mode(SaveMode.ErrorIfExists)
.parquet(destination)
}
}
)
calendar.add(Calendar.HOUR_OF_DAY, 1)
if (...) {
continue = false
}
}
futures.foreach(_.get())
, а конкретный T, который я использую, определяется как
case class Clazz(
my_field: Option[String],
...
)
Я использую Spark 2.4.0.Меня беспокоит, что это работает около 98% времени и не работает 2% времени (по тем же данным).Я неправильно планирую параллельные задания или это проблема в Spark?