Я использую структурированную потоковую передачу, и следующий код работает
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.rdd.mapPartitions(...)
}}
Но следующий код выдает исключение, object not serializable (class: redis.clients.jedis.Jedis, value: redis.clients.jedis.Jedis@a8e0378)
Код имеет только одно место изменения (измените RDD на DataFrame) :
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.mapPartitions(...) // only change is change batchDF.rdd to batchDF
}}
Мой код Jedis
должен выполняться на драйвере и никогда не достигать исполнителя. Я полагаю, что Spark RDD и DataFrame должны иметь похожий APIS? Почему это происходит?
Я использовал ctrl для go в коде нижнего уровня. batchDF.mapPartitions
переходит к
@Experimental
@InterfaceStability.Evolving
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] =
{
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
, а batchDF.rdd.mapPartitions
переходит к
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
Моя версия Spark - 2.4.3.
Моя самая простая версия кода ниже и я только что нашел что-то еще ...
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.mapPartitions(x => {
val arr = x.grouped(2).toArray // this line matters
})
// only change is change batchDF.rdd to batchDF
}}