Понимание Flink - задача не сериализуема - PullRequest
0 голосов
/ 18 сентября 2018

Я работаю над проектом Flink и столкнулся с проблемой, которую мне удалось решить с помощью ответов от Stackoverflow.Однако мне не понятно , почему предлагаемые решения действительно работают, и я нашел информацию по этой теме немногочисленной.Рассмотрим следующий код:

object DeCP {
  def main(args: Array[String]): Unit = {
    val params: ParameterTool = ParameterTool.fromArgs(args)

    // Get the execution environment and read the data
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val queryPoints: DataSet[Point] = readQueryPoints(env, params)
    val points: DataSet[Point] = readFeatureVector(env, params)

    // Process the query points
    queryPoints
      .map(new KNNRich)
      .withBroadcastSet(points, "pointsIn")
      .print
  }

  final class KNNRich extends RichMapFunction[Point, (Point, Vector[Point])]{
    private var pointsIn: Traversable[Point] = _

    override def open(parameters: Configuration): Unit =
      pointsIn = getRuntimeContext.getBroadcastVariable[Point]("pointsIn").asScala

    def map(queryPoint: Point): (Point, Vector[Point]) = {
      val dataSetIn = ExecutionEnvironment.getExecutionEnvironment
                                          .fromCollection(pointsIn.toVector)
      val cluster = new Cluster(dataSetIn, queryPoint)
      val knn = cluster.kNearestNeighbor(queryPoint, 3) // This call causes problems
      (queryPoint, knn.collect.toVector)
    }
  }
}

Класс Cluster и сопутствующий объект определены как:

class Cluster(var points: DataSet[Point],
              var clusterLeader: Point) extends Serializable {
  private var queryPoint: Point = _

  def distance(p: Point): Point = {
    p.eucDist(queryPoint)
  }

  def kNearestNeighbor(queryPoint: Point, k: Int): DataSet[Point] = {
    this.queryPoint = queryPoint

    this.points.map{p => distance(p)} // Task not serializable
    this.points.map{p => p.eucDist(queryPoint)} // Works
    this.points.map{p => Cluster.staticDistance(queryPoint, p)} // Works
  }
}

object Cluster {
  def staticDistance(queryPoint: Point, p: Point): Point = {
    p.eucDist(queryPoint)
  }
}

Вызов метода distance вызывает задачу, а не сериализованное исключение, но заменяетвызов метода с определением устраняет проблему.Точно так же, определение точно такого же метода, как члена объекта-компаньона, позволяет коду функционировать должным образом.

Почему первый звонок не работает, а два других звонка работают?Что произойдет, если у вас есть более сложный поток выполнения в классе, который нелегко заменить в качестве методов объекта-компаньона?

1 Ответ

0 голосов
/ 27 сентября 2018

Выполняя преобразования DataSet, вы просто создаете логический план вашего конвейера.Конвейер передается в кластер с помощью вызова execute/print/collect.

Когда конвейер передается в кластер, каждая функция, такая как ваш RichMapFunction, сериализуется, отправляется в кластер, дублируется для каждого параллельного экземпляра,и выполняется независимо.Когда вы получаете исключение «Задача не сериализуема», это означает, что ваш RichMapFunction транзитивно ссылается на переменные / объекты вне этого класса.Вы должны убедиться, что функция является независимым блоком.

Вызывая points.map{}, вы неявно создаете MapFunction.Но этот MapFunction имеет ссылку на экземпляр Cluster, поэтому не является независимым.Flink пытается также сериализовать Cluster, но не удается.Если distance будет статическим (определено в сопутствующем объекте), то Cluster также не нуждается в сериализации.

Кстати, еще одна проблема вашего примера состоит в том, что вы не используетеDataSet API, как задумано.Обычно вы не должны создавать конвейер в работающем конвейере.Это также может вызвать непреднамеренные побочные эффекты.

...