Scala - Как выбрать последний элемент из RDD? - PullRequest
0 голосов
/ 04 февраля 2019

Сначала у меня было salesList: List[Sale], и чтобы получить идентификатор последней продажи в списке, который я использовал lastOption:

val lastSaleId: Option[Any] = salesList.lastOption.map(_.saleId)

Но теперь я изменил метод сList[Sale] для работы с salesListRdd: List[RDD[Sale]].Поэтому я изменил способ получения идентификатора последней продажи:

  val lastSaleId: Option[Any] = SparkContext
    .union(salesListRdd)
    .collect().toList
    .lastOption.map(_.saleId)

Я не уверен, что это лучший способ.Потому что здесь я все еще собираю RDD в List, который переносит его на узел драйвера, и это может привести к тому, что драйверу не хватит памяти.

Есть ли способ получить идентификатор последней продажи изСДР сохраняя первоначальный порядок записей?Не какой-либо вид сортировки, а способ, которым объекты Продажи изначально хранились в Списке?

Ответы [ 2 ]

0 голосов
/ 05 февраля 2019

Там как минимум два эффективных решения.Вы можете использовать top с zipWithIndex:

def lastValue[T](rdd: RDD[T]): Option[T] = {
  rdd.zipWithUniqueId.map(_.swap).top(1)(Ordering[Long].on(_._1)).headOption.map(_._2)
}

или top с пользовательским ключом:

 def lastValue[T](rdd: RDD[T]): Option[T] = {
   rdd.mapPartitionsWithIndex(
     (i, iter) => iter.zipWithIndex.map {  case (x, j) => ((i, j), x) }
   ).top(1)(Ordering[(Int, Long)].on(_._1)).headOption.map(_._2)
 }

Для первого требуется дополнительное действие для zipWithIndex, в то время какпоследний нет.

Перед использованием, пожалуйста, обязательно поймите ограничение. Цитирование документов :

Обратите внимание, что некоторые RDD, такие как возвращенные groupBy (), не гарантируют порядок элементов в разделе.Поэтому уникальный идентификатор, назначенный каждому элементу, не гарантируется и может даже измениться, если СДР будет переоценен.Если для обеспечения одинаковых назначений индекса требуется фиксированный порядок, следует отсортировать СДР с помощью sortByKey () или сохранить его в файл.

В частности, в зависимости от точного ввода, Unionможет не сохранить порядок ввода вообще.

0 голосов
/ 05 февраля 2019

Вы можете использовать zipWithIndex и сортировать по нему descending, чтобы последняя запись была сверху, а затем принять (1):

salesListRdd
    .zipWithIndex()
    .map({ case (x, y) => (y, x) })
    .sortByKey(ascending = false)
    .map({ case (x, y) => y })
    .take(1)

Решение взято отсюда: http://www.swi.com/spark-rdd-getting-bottom-records/ Однако, это крайне неэффективно, так как делает много перестановок разделов.

...