Я пытаюсь понять, почему не возникнут проблемы с сериализацией при выполнении этого фрагмента кода.
dstream.foreachRDD{rdd =>
rdd.cache()
val alternatives = restServer.get(“/v1/alternatives”).toSet
alternatives.foreach{alternative =>
val filteredRDD = rdd.filter(element => element.kind == alternative)
val formatter = new Formatter(alternative)
val recordRDD = filteredRDD.map(element => formatter(element))
recordRDD.foreachPartition{partition =>
val conn = DB.connect(server)
partition.foreach(element => conn.insert(alternative, element)
}
}
rdd.unpersist(true)
}
Так, например, при выполнении rdd.filter(element => element.kind == alternative)
не должны возникать проблемы с сериализацией с замыканием, потому что restServer
не является сериализуемым и находится в области действия замыкания, которое мы передаем операции фильтрации? Также по той же логике сам rdd находится в области действия для замыкания метода filter.