Я работаю над проектом Flink и столкнулся с проблемой, которую мне удалось решить с помощью ответов от Stackoverflow.Однако мне не понятно , почему предлагаемые решения действительно работают, и я нашел информацию по этой теме немногочисленной.Рассмотрим следующий код:
object DeCP {
def main(args: Array[String]): Unit = {
val params: ParameterTool = ParameterTool.fromArgs(args)
// Get the execution environment and read the data
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val queryPoints: DataSet[Point] = readQueryPoints(env, params)
val points: DataSet[Point] = readFeatureVector(env, params)
// Process the query points
queryPoints
.map(new KNNRich)
.withBroadcastSet(points, "pointsIn")
.print
}
final class KNNRich extends RichMapFunction[Point, (Point, Vector[Point])]{
private var pointsIn: Traversable[Point] = _
override def open(parameters: Configuration): Unit =
pointsIn = getRuntimeContext.getBroadcastVariable[Point]("pointsIn").asScala
def map(queryPoint: Point): (Point, Vector[Point]) = {
val dataSetIn = ExecutionEnvironment.getExecutionEnvironment
.fromCollection(pointsIn.toVector)
val cluster = new Cluster(dataSetIn, queryPoint)
val knn = cluster.kNearestNeighbor(queryPoint, 3) // This call causes problems
(queryPoint, knn.collect.toVector)
}
}
}
Класс Cluster и сопутствующий объект определены как:
class Cluster(var points: DataSet[Point],
var clusterLeader: Point) extends Serializable {
private var queryPoint: Point = _
def distance(p: Point): Point = {
p.eucDist(queryPoint)
}
def kNearestNeighbor(queryPoint: Point, k: Int): DataSet[Point] = {
this.queryPoint = queryPoint
this.points.map{p => distance(p)} // Task not serializable
this.points.map{p => p.eucDist(queryPoint)} // Works
this.points.map{p => Cluster.staticDistance(queryPoint, p)} // Works
}
}
object Cluster {
def staticDistance(queryPoint: Point, p: Point): Point = {
p.eucDist(queryPoint)
}
}
Вызов метода distance
вызывает задачу, а не сериализованное исключение, но заменяетвызов метода с определением устраняет проблему.Точно так же, определение точно такого же метода, как члена объекта-компаньона, позволяет коду функционировать должным образом.
Почему первый звонок не работает, а два других звонка работают?Что произойдет, если у вас есть более сложный поток выполнения в классе, который нелегко заменить в качестве методов объекта-компаньона?