Там как минимум два эффективных решения.Вы можете использовать top
с zipWithIndex
:
def lastValue[T](rdd: RDD[T]): Option[T] = {
rdd.zipWithUniqueId.map(_.swap).top(1)(Ordering[Long].on(_._1)).headOption.map(_._2)
}
или top
с пользовательским ключом:
def lastValue[T](rdd: RDD[T]): Option[T] = {
rdd.mapPartitionsWithIndex(
(i, iter) => iter.zipWithIndex.map { case (x, j) => ((i, j), x) }
).top(1)(Ordering[(Int, Long)].on(_._1)).headOption.map(_._2)
}
Для первого требуется дополнительное действие для zipWithIndex
, в то время какпоследний нет.
Перед использованием, пожалуйста, обязательно поймите ограничение. Цитирование документов :
Обратите внимание, что некоторые RDD, такие как возвращенные groupBy (), не гарантируют порядок элементов в разделе.Поэтому уникальный идентификатор, назначенный каждому элементу, не гарантируется и может даже измениться, если СДР будет переоценен.Если для обеспечения одинаковых назначений индекса требуется фиксированный порядок, следует отсортировать СДР с помощью sortByKey () или сохранить его в файл.
В частности, в зависимости от точного ввода, Union
может не сохранить порядок ввода вообще.