Apache Spark: получить первый и последний ряд каждого раздела - PullRequest
0 голосов
/ 20 февраля 2020

Я бы хотел получить первый и последний ряд каждого раздела в spark (я использую pyspark). Как я go об этом? В своем коде я перераспределяю свой набор данных на основе ключевого столбца, используя:

mydf.repartition(keyColumn).sortWithinPartitions(sortKey)

Есть ли способ получить первую и последнюю строку для каждого раздела? Спасибо

Ответы [ 3 ]

1 голос
/ 21 февраля 2020

Я бы настоятельно рекомендовал не работать с разделами напрямую. Spark выполняет большую часть оптимизации DAG, поэтому при попытке выполнить определенные функции c для каждого раздела все ваши предположения о разделах и их распределении могут быть полностью ложными.

Однако, похоже, у вас есть keyColumn и sortKey, поэтому я бы просто предложил сделать следующее:

import pyspark
import pyspark.sql.functions as f

w_asc = pyspark.sql.Window.partitionBy(keyColumn).orderBy(f.asc(sortKey))
w_desc = pyspark.sql.Window.partitionBy(keyColumn).orderBy(f.desc(sortKey))
res_df = mydf. \
 withColumn("rn_asc", f.row_number().over(w_asc)). \
 withColumn("rn_desc", f.row_number().over(w_desc)). \
 where("rn_asc = 1 or rn_desc = 1")

В результирующем фрейме данных будет 2 дополнительных столбца, где rn_asc=1 обозначает первую строку, а rn_desc=1 обозначает последний ряд.

0 голосов
/ 21 февраля 2020

Вот еще один подход, использующий mapPartitions из RDD API. Мы перебираем элементы каждого раздела, пока не достигнем конца. Я ожидаю, что эта итерация будет очень быстрой. Вот код:

df = spark.createDataFrame([
  ["Tom", "a"],
  ["Dick", "b"],
  ["Harry", "c"],
  ["Elvis", "d"],
  ["Elton", "e"],
  ["Sandra", "f"]
], ["name", "toy"])

def get_first_last(it):
      first = last = next(it)
      for last in it:
        pass

      # Attention: if first equals last by reference return only one!
      if first is last:
        return [first]

      return [first, last]

# coalesce here is just for demonstration
first_last_rdd = df.coalesce(2).rdd.mapPartitions(get_first_last)

spark.createDataFrame(first_last_rdd, ["name", "toy"]).show()

# +------+---+
# |  name|toy|
# +------+---+
# |   Tom|  a|
# | Harry|  c|
# | Elvis|  d|
# |Sandra|  f|
# +------+---+

PS: Нечетные позиции будут содержать первый элемент раздела, а четные - последний элемент. Также обратите внимание, что число результатов будет (numPartitions * 2) - numPartitionsWithOneItem, которое, как я ожидаю, будет относительно небольшим, поэтому не стоит беспокоиться о стоимости нового оператора createDataFrame.

0 голосов
/ 21 февраля 2020

Scala: Я думаю, что перераспределение происходит не по ключевому столбцу, но для него требуется целое число, как раздел, который вы хотите установить. Я сделал способ выбрать первую и последнюю строку с помощью функции Window искры.

Во-первых, это мои тестовые данные.

+---+-----+
| id|value|
+---+-----+
|  1|    1|
|  1|    2|
|  1|    3|
|  1|    4|
|  2|    1|
|  2|    2|
|  2|    3|
|  3|    1|
|  3|    3|
|  3|    5|
+---+-----+

Затем я использую Window функционируют дважды, потому что я не могу легко узнать последнюю строку, но обратное довольно легко.

import org.apache.spark.sql.expressions.Window
val a = Window.partitionBy("id").orderBy("value")
val d = Window.partitionBy("id").orderBy(col("value").desc)

val df = spark.read.option("header", "true").csv("test.csv")
df.withColumn("marker", when(rank.over(a) === 1, "Y").otherwise("N"))
  .withColumn("marker", when(rank.over(d) === 1, "Y").otherwise(col("marker")))
  .filter(col("marker") === "Y")
  .drop("marker").show

Окончательный результат равен

+---+-----+
| id|value|
+---+-----+
|  3|    5|
|  3|    1|
|  1|    4|
|  1|    1|
|  2|    3|
|  2|    1|
+---+-----+
...