Как я могу добавить столбец в DataFrame, который группирует строки в куски N? Как NTILE, но с фиксированным размером ковша - PullRequest
0 голосов
/ 16 января 2020

Скажем, у меня есть DataFrame вроде:

+------------+-----------+-----+
|        feed|artist     |count|
+------------+-----------+-----+
|           y| Kanye West|    9|
|           y|  Radiohead|    6|
|           y|     Zero 7|    3|
|           y| Puts Marie|    1|
|        gwas|       Drax|    7|
|        gwas|    Calibre|    4|
|        gwas| Aphex Twin|    1|
|        gwas|      Jay Z|    1|
|           x|   DJ Krush|    2|
|           x|  Titeknots|    1|
+------------+-----------+-----+

Я хочу добавить новый столбец, который разбивает строки на сегменты N строк для каждого раздела (feed).

Мне кажется, что это инверсия NTILE. NTILE позволяет вам выбрать количество блоков, но вместо этого я хочу выбрать размер блока.

Вот желаемый результат. Обратите внимание, как каждый feed разбивается на группы по N = 2, включая ленту x, в которой есть только один фрагмент из 2 строк. ( Редактировать: каждый раздел упорядочен по count, поэтому группа 1 в каждом разделе будет представлять собой строки с наибольшим значением для count)

+------------+-----------+-----+-----+
|        feed|artist     |count|group|
+------------+-----------+-----+-----+
|           y| Kanye West|    1|    9|
|           y|  Radiohead|    1|    6|
|           y|     Zero 7|    1|    3|
|           y| Puts Marie|    1|    1|
|        gwas|       Drax|    7|    7|
|        gwas|    Calibre|    1|    4|
|        gwas| Aphex Twin|    1|    1|
|        gwas|      Jay Z|    8|    1|
|           x|   DJ Krush|    2|    2|
|           x|  Titeknots|    1|    1|
+------------+-----------+-----+-----+

Как Бонус, я хотел бы, чтобы каждое ведро было разного размера. Например, List(2, 2, 4, 10, 10, -1) будет означать, что в первом сегменте 2 строк, во втором 2 строк, в третьем 4 строк и т. Д. c., А в последнем блоке (-1) содержится остаток .

РЕДАКТИРОВАТЬ

(Еще один полезный вариант)

При реализации ответов я понял, что есть еще один вариант, который я бы предпочел:

Добавление столбца в DataFrame, который разбивает его строки на группы по N, не зная размера DataFrame.

Пример:

Если N = 100 и DataFrame имеет 800 строк, он разбивает его на 8 сегментов по 100. Если DataFrame имеет 950 строк, он разбивает его на 9 сегментов по 100 и 1 сегментов по 50. Он должен не требуется сканирование / вызов .count().

Примерные кадры данных аналогичны приведенным выше.

(meta: мне задать новый вопрос для этого варианта? Я чувствую, что «NTILE с фиксированным размером сегмента» - более элегантная проблема, и, вероятно, более распространенная, чем моя или исходный вариант использования)

Ответы [ 2 ]

2 голосов
/ 17 января 2020

Если я правильно вас понимаю, это можно сделать с помощью выражения SQL:

import org.apache.spark.sql.functions.{expr,row_number,desc}
import org.apache.spark.sql.expressions.Window

// set up WindowSpec
val w1 = Window.partitionBy("feed").orderBy(desc("count"))

val L = List(2, 2, 4, 10, 10, -1)

// dynamically create SQL expression from the List `L` to map row_number into group-id
var sql_expr = "CASE"
var running_total = 0
for(i <- 1 to L.size) {
  running_total += L(i-1)
  sql_expr += (if(L(i-1) > 0) s" WHEN rn <= $running_total THEN $i " else s" ELSE $i END")
}
println(sql_expr)
//CASE  WHEN rn <= 2 THEN 1  WHEN rn <= 4 THEN 2  WHEN rn <= 8 THEN 3  WHEN rn <= 18 THEN 4  WHEN rn <= 28 THEN 5  ELSE 6 END 

val df_new = df.withColumn("rn", row_number().over(w1)).withColumn("group", expr(sql_expr)).drop("rn")
df_new.show
+----+----------+-----+-----+
|feed|    artist|count|group|
+----+----------+-----+-----+
|gwas|      Drax|    7|    1|
|gwas|   Calibre|    4|    1|
|gwas|Aphex Twin|    1|    2|
|gwas|     Jay Z|    1|    2|
|   x|  DJ Krush|    2|    1|
|   x| Titeknots|    1|    1|
|   y|Kanye West|    9|    1|
|   y| Radiohead|    6|    1|
|   y|    Zero 7|    3|    2|
|   y|Puts Marie|    1|    2|
+----+----------+-----+-----+

Для фиксированного N просто приведите (row_number-1)/N + 1 к int:

val N = 2
val df_new = df.withColumn("group", ((row_number().over(w1)-1)/N+1).cast("int"))
1 голос
/ 16 января 2020

Это может сработать:

val bucketDef =  List(2, 2, 4, 10, 10)

val bucketRunsum = bucketDef.scanLeft(1)( _ + _) // calc running sum

// maps a row-number to a bucket
val indexBucketMapping = bucketRunsum.zip(bucketRunsum.tail)
  .zipWithIndex
  .map{case ((start,end),index) => ((start,end),index+1)} // make index start at 1

// gives List(((1,3),1), ((3,5),2), ((5,9),3), ((9,19),4), ((19,29),5))

// udf to assign a bucket to a given row-number
val calcBucket = udf((rnb:Long) => indexBucketMapping
  .find{case ((start,end),_) => start<=rnb && rnb < end}
  .map(_._2) // get index
  .getOrElse(indexBucketMapping.last._2+1) // is in last bucket
)

df
  .withColumn("group",calcBucket(row_number().over(Window.partitionBy($"feed").orderBy($"count"))))

в качестве альтернативы (без UDF) создать DataFrame, который отображает номер строки в корзину, а затем объединить

val bucketSizeDef =List(2, 2, 4, 10, 10)
val bucketDef =  (1 +: bucketSizeDef).zipWithIndex.map{case (bs,index) => (bs,index+1)}
    .toDF("bucketSize","group")
    .withColumn("i",sum($"bucketSize").over(Window.orderBy($"group")))
    .withColumn("i_to",coalesce(lead($"i",1).over(Window.orderBy($"group")),lit(Long.MaxValue)))
    .drop($"bucketSize")

bucketDef.show()

:

+-----+---+-------------------+
|group|  i|               i_to|
+-----+---+-------------------+
|    1|  1|                  3|
|    2|  3|                  5|
|    3|  5|                  9|
|    4|  9|                 19|
|    5| 19|                 29|
|    6| 29|9223372036854775807|
+-----+---+-------------------+

затем присоединитесь к df:

df
  .withColumn("rnb",row_number().over(Window.partitionBy($"feed").orderBy($"count")))
  .join(broadcast(bucketDef),$"rnb">= $"i" and $"rnb"< $"i_to")
  .drop("rnb","i","i_to")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...