Почему Spark не сериализуемое исключение возникает при смене RDD на DataFrame? - PullRequest
0 голосов
/ 03 марта 2020

Я использую структурированную потоковую передачу, и следующий код работает

val j = new Jedis() // an redis client which is not serializable.

xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
  j.xtrim(...)... // call function of Jedis here
  batchDF.rdd.mapPartitions(...)
}}

Но следующий код выдает исключение, object not serializable (class: redis.clients.jedis.Jedis, value: redis.clients.jedis.Jedis@a8e0378)

Код имеет только одно место изменения (измените RDD на DataFrame) :

val j = new Jedis() // an redis client which is not serializable.

xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
  j.xtrim(...)... // call function of Jedis here
  batchDF.mapPartitions(...)  // only change is change batchDF.rdd to batchDF
}}

Мой код Jedis должен выполняться на драйвере и никогда не достигать исполнителя. Я полагаю, что Spark RDD и DataFrame должны иметь похожий APIS? Почему это происходит?


Я использовал ctrl для go в коде нижнего уровня. batchDF.mapPartitions переходит к

  @Experimental
  @InterfaceStability.Evolving
  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = 
  {
    new Dataset[U](
      sparkSession,
      MapPartitions[T, U](func, logicalPlan),
      implicitly[Encoder[U]])
  }

, а batchDF.rdd.mapPartitions переходит к

    def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }

Моя версия Spark - 2.4.3.

Моя самая простая версия кода ниже и я только что нашел что-то еще ...

val j = new Jedis() // an redis client which is not serializable.

xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
  j.xtrim(...)... // call function of Jedis here
  batchDF.mapPartitions(x => {
    val arr = x.grouped(2).toArray // this line matters
  })
  // only change is change batchDF.rdd to batchDF
}}

1 Ответ

1 голос
/ 03 марта 2020

см. эта реализация API DataFrame

внутренне вызывает rdd.mapPartitions вашей функции.

     /**
       * Returns a new RDD by applying a function to each partition of this DataFrame.
       * @group rdd
       * @since 1.3.0
       */
      def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
        rdd.mapPartitions(f)
      }

Нет разницы, где вы могли бы сделать ошибка.

AFAIK, в идеале это должно быть так

 batchDF.mapPartitions { yourparition =>
// better to create a JedisPool and take object rather than new Jedis
 val j = new Jedis() 
val result = yourparition.map {
// do some process here
}

j.close // release and take care of connections/ resources here
result
}
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...