Как исправить «Искатель не найден для scala.Option [String]» в Spark - PullRequest
0 голосов
/ 19 декабря 2018

Я запускаю параллельные задания Spark в одном приложении Spark, и примерно в 2% случаев я получаю сообщение об ошибке, подобное этому:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: No Encoder found for scala.Option[String]
- field (class: "scala.Option", name: "my_field")
- root class: "my.package.Clazz"
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at my.package.Application

Это происходит из org.apache.spark.sql.catalyst.ScalaReflection.

Согласно документации Spark, в приложении Spark "полностью поточно-ориентирован и поддерживает этот сценарий использования для включения приложений, которые обслуживают несколько запросов": https://spark.apache.org/docs/latest/job-scheduling.html

Для моего приложения необходим параллелизм.

Особый вариант использования включает считывание нескольких ключей секционирования из Cassandra в набор данных [T], с которым драйвер Spark Cassandra в настоящее время плохо справляется.Мой код выглядит так:

var continue = true
while (continue)
  futures += executor.submit(
    new Runnable {
      override def run(): Unit = {
        val factory = new ClassBasedRowReaderFactory[T]()
        val reader = factory.rowReader(tableDef, caseClassColumns)
        val sqlHour = new sql.Timestamp(calendar.getTime.getTime)

        val rawRows = new ListBuffer[T]()
        session.execute(preparedStatement.bind(sqlHour)).forEach(new Consumer[Row] {
          override def accept(row: Row): Unit = {
            val metadata = CassandraRowMetadata(tableColumns)
            rawRows += reader.read(row, metadata)
          }
        })

        val rowsDataset = sparkSession
          .createDataset(rawRows)
          .coalesce(1)

        val destination = ...
        rowsDataset.write
          .mode(SaveMode.ErrorIfExists)
          .parquet(destination)
      }
    }
  )

  calendar.add(Calendar.HOUR_OF_DAY, 1)
  if (...) {
    continue = false
  }
}

futures.foreach(_.get())

, а конкретный T, который я использую, определяется как

case class Clazz(
  my_field: Option[String],
  ...
)

Я использую Spark 2.4.0.Меня беспокоит, что это работает около 98% времени и не работает 2% времени (по тем же данным).Я неправильно планирую параллельные задания или это проблема в Spark?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...