Flink Scala - метод сравнения нарушает общий договор - PullRequest
0 голосов
/ 13 октября 2018

Я пишу проект во Флинке, который предусматривает потоковую передачу набора точек запроса по пакетным данным и выполнение полного последовательного сканирования для поиска ближайших соседей.То, что должно быть простой операцией сортировки для одного значения Float, приводит к нарушению общей ошибки контракта.Основной метод определен как:

object StreamingDeCP{
  var points: Vector[Point] = _

  def main(args: Array[String]): Unit = {
    val queryPointsVec: Vector[Point] = ... // Read from file
    val pointsVec: Vector[Point] = ...      // Read from file

    val streamEnv: StreamExecutionEnvironment = 
                   StreamExecutionEnvironment.getExecutionEnvironment
    val queryPoints = streamEnv.fromCollection(queryPointsVec)

    points = pointsVec
    queryPoints.map(new StreamingSequentialScan)

    streamEnv.execute("StreamingDeCP")
  }

  final class StreamingSequentialScan 
                    extends MapFunction[Point, (Point, Vector[Point])] {

    def map(queryPoint: Point): (Point, Vector[Point]) = {
      val nn = points
                .map{ _.eucDist(queryPoint) }
                .sorted

      (queryPoint, nn)
    }
  }
}

Класс и объект-компаньон Point:

case class Point(pointID: Long,
                 descriptor: Vector[Float]) extends Serializable {
  var distance: Float = Float.MaxValue

  def eucDist(that: Point): Point = {
    // Simple arithmetic to calculate and set the distance variable
  }
}

object Point{
  implicit def orderByDistance[A <: Point]: Ordering[A] =
    Ordering.by(_.distance)
}

Вот некоторые заметки о том, что я пробовал, чтобы точно определитьпричина:

  • Утверждено, что все значения distance находятся между Float.MaxValue и Float.MinValue, и значения с отрицательным нулем отсутствуют
  • Утверждено, что нет повторяющихся distance переменныхв рамках одной и той же операции сортировки (мой вариант использования позволяет это сделать, но я подумал, что проверю это на всякий случай)
  • Преобразовал число с плавающей точкой в ​​целочисленное значение и вместо этого отсортировал по этим значениям
  • Добавленоявное упорядочение Point вместо использования имплицитов
  • Сортировка по уникальному pointID вместо distance, который работает , но бесполезен для контекста этой проблемы.

Я также отметил, что выполнение одного и того же кода не всегда надежно воспроизводит ошибку.Я читаю Vector[Points] полностью детерминированным образом, поэтому единственной возможной причиной такого поведения должен быть планировщик Flink или какое-либо вычисление с сохранением состояния в методе сортировки.

Другие посты на ту же тему, по-видимому, связаны спропущенный сценарий в пользовательском компараторе, но это должна быть простая операция сортировки для одного значения типа Float, поэтому я понятия не имею, что может вызвать проблему.

1 Ответ

0 голосов
/ 13 октября 2018

Я не знаком с Flink, но у меня нет никаких оснований предполагать, что он будет выполнять каждую смущающе параллельную MapFunction задачу в последовательном однопоточном режиме.

Поскольку ваш Point содержит var с, а эти var изменены в методе map MapFunction, код должен завершиться с ошибкой «Метод сравнения нарушает его общий контракт» - исключение всякий раз, когда MapFunction выполняется с параллелизмом != 1.

Чтобы избежать каких-либо побочных эффектов внутри функции map, вы можете изменить код следующим образом:

  • Удалите все var s из main, сделайте points неизменным val.
  • Удалите все виды var s из Point
  • Реализуйте метод

    def eucDist(other: Point): Double
    

    , который просто вычисляет расстояние до другой точки (ничего не изменяя).

  • Использование sortBy:

    val nn = points.sortBy(_.eucDist(queryPoint))
    

В качестве альтернативы, если вы хотите избежать повторного вычисления евклидова расстояния несколько раз во время сортировкиЗатем, предварительно рассчитайте расстояния один раз, отсортируйте, а затем выбросьте расстояния:

val nn = points.map(p => (p, p.eucDist(queryPoint))).sortBy(_._2).map(_._1)
...