Я пишу проект во Флинке, который предусматривает потоковую передачу набора точек запроса по пакетным данным и выполнение полного последовательного сканирования для поиска ближайших соседей.То, что должно быть простой операцией сортировки для одного значения Float, приводит к нарушению общей ошибки контракта.Основной метод определен как:
object StreamingDeCP{
var points: Vector[Point] = _
def main(args: Array[String]): Unit = {
val queryPointsVec: Vector[Point] = ... // Read from file
val pointsVec: Vector[Point] = ... // Read from file
val streamEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val queryPoints = streamEnv.fromCollection(queryPointsVec)
points = pointsVec
queryPoints.map(new StreamingSequentialScan)
streamEnv.execute("StreamingDeCP")
}
final class StreamingSequentialScan
extends MapFunction[Point, (Point, Vector[Point])] {
def map(queryPoint: Point): (Point, Vector[Point]) = {
val nn = points
.map{ _.eucDist(queryPoint) }
.sorted
(queryPoint, nn)
}
}
}
Класс и объект-компаньон Point
:
case class Point(pointID: Long,
descriptor: Vector[Float]) extends Serializable {
var distance: Float = Float.MaxValue
def eucDist(that: Point): Point = {
// Simple arithmetic to calculate and set the distance variable
}
}
object Point{
implicit def orderByDistance[A <: Point]: Ordering[A] =
Ordering.by(_.distance)
}
Вот некоторые заметки о том, что я пробовал, чтобы точно определитьпричина:
- Утверждено, что все значения
distance
находятся между Float.MaxValue и Float.MinValue, и значения с отрицательным нулем отсутствуют - Утверждено, что нет повторяющихся
distance
переменныхв рамках одной и той же операции сортировки (мой вариант использования позволяет это сделать, но я подумал, что проверю это на всякий случай) - Преобразовал число с плавающей точкой в целочисленное значение и вместо этого отсортировал по этим значениям
- Добавленоявное упорядочение
Point
вместо использования имплицитов - Сортировка по уникальному
pointID
вместо distance
, который работает , но бесполезен для контекста этой проблемы.
Я также отметил, что выполнение одного и того же кода не всегда надежно воспроизводит ошибку.Я читаю Vector[Points]
полностью детерминированным образом, поэтому единственной возможной причиной такого поведения должен быть планировщик Flink или какое-либо вычисление с сохранением состояния в методе сортировки.
Другие посты на ту же тему, по-видимому, связаны спропущенный сценарий в пользовательском компараторе, но это должна быть простая операция сортировки для одного значения типа Float, поэтому я понятия не имею, что может вызвать проблему.