Как разделить, ранжировать и отсортировать данные с помощью RDD pyspark? - PullRequest
0 голосов
/ 07 мая 2020

Я использую pyspark и имею RDD следующего формата:

RDD1 = (age, code, count)

Мне нужно найти код с наибольшим числом для каждого возраста.

Я завершил это в dataframe, используя Window function и partitioning by age:

df1 = df.withColumn("rank", rank().over(Window.partitionBy("age") 
\.orderBy(desc("count")))).sort("age", desc("count"))

df2 = df1.select("age", "code", "count", "rank").where("rank = 1")

Однако мне нужно найти тот же результат, используя только RDD operations, но я ' м не совсем уверен, как это сделать. Любые предложения были бы очень полезны!

Ответы [ 2 ]

1 голос
/ 08 мая 2020

Попробуйте это (для pyspark):

rdd1.keyBy(lambda x: x[0]).reduceByKey(lambda x,y: x if x[2] >= y[2] else y).values().collect()

Где:

  1. используйте keyBy(lambda x: x[0]) для преобразования исходного RDD в парный RDD с элементом (age, (age, code, count))
  2. используйте reduceByKey(lambda x,y: x if x[2] >= y[2] else y), чтобы найти элемент с max (count) для каждого возраста
  3. возьмите values(), что составляет (age, code, count)

Примечание: это принимает только один элемент в случае связей на максимальном значении

0 голосов
/ 07 мая 2020

К сожалению, в настоящий момент window functions без предложения PARTITION BY перемещает все данные в один раздел, поэтому это особенно полезно, если у вас большой набор данных.

Если вы не против использовать API разработчиков, вы можете можно попробовать RDDFunctions.sliding, но это требует ручной обработки.

import org.apache.spark.mllib.rdd.RDDFunctions._

val first = rdd.first match {
  case NameValue(name, value) => NameValueWithLag(name, value, value)
}

sc.parallelize(Seq(first)).union(rdd
  .sliding(2)
  .map(a => NameValueWithLag(a(1).name, a(1).value, a(1).value - a(0).value)))

Случайным образом разбивает этот RDD с предоставленными весами

Самый простой подход - преобразовать RDD в фрейм данных и выполните операцию, а затем преобразуйте его в RDD.

Dataframe в RDD

До Spark 2.0, spark_df.map будет псевдонимом spark_df.rdd.map(). В Spark 2.0 вы должны явно вызвать .rdd first spark_df.rdd.map().

...