искра получает минимальное значение в столбце, которое удовлетворяет условию - PullRequest
0 голосов
/ 19 ноября 2018

У меня есть DataFrame в spark, который выглядит следующим образом:

id |  flag
----------
 0 |  true
 1 |  true
 2 | false
 3 |  true
 4 |  true
 5 |  true
 6 | false
 7 | false
 8 |  true
 9 | false

Я хочу получить другой столбец с текущим rowNumber, если он имеет flag == false, или rowNumber следующего ложного значения, поэтомувывод будет выглядеть следующим образом:

id |  flag | nextOrCurrentFalse
-------------------------------
 0 |  true |                  2
 1 |  true |                  2
 2 | false |                  2
 3 |  true |                  6
 4 |  true |                  6
 5 |  true |                  6
 6 | false |                  6
 7 | false |                  7
 8 |  true |                  9
 9 | false |                  9

Я хочу сделать это векторизованным способом (без итерации по строке).Поэтому я хочу, чтобы логика была такой:

  • Для каждой строки получите минимальный идентификатор, больший или равный текущему rowNum с флагом == false

Ответы [ 3 ]

0 голосов
/ 20 ноября 2018

См. Другой ответ, который лучше, но оставил это здесь, так что для образовательных целей SQL - возможно.

Это делает то, что вы хотите, но мне было бы интересно узнать, что другие думают об этом в масштабе.Я собираюсь проверить Catalyst и посмотреть, как он работает процедурно, но я думаю, что это может означать некоторые промахи на границах разделов, я также хочу это проверить.

import org.apache.spark.sql.functions._
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
df.createOrReplaceTempView("tf") 

// Performance? Need to check at some stage how partitioning works in such a case.
spark.sql("CACHE TABLE tf") 
val res1 = spark.sql("""  
                       SELECT tf1.*, tf2.id as id2, tf2.flag as flag2
                         FROM tf tf1, tf tf2
                        WHERE tf2.id  >= tf1.id
                          AND tf2.flag = false 
                     """)    

//res1.show(false)
res1.createOrReplaceTempView("res1") 
spark.sql("CACHE TABLE res1") 

val res2 = spark.sql(""" SELECT X.id, X.flag, X.id2 
                           FROM (SELECT *, RANK() OVER (PARTITION BY id ORDER BY id2 ASC) as rank_val 
                                   FROM res1) X
                          WHERE X.rank_val = 1
                       ORDER BY id
                    """) 

res2.show(false)
0 голосов
/ 20 ноября 2018

Подумав о масштабировании и тому подобном - но неясно, достаточно ли хорош Catalyst, - я предлагаю решение, основанное на одном из ответов, которое может выиграть от разбиения, и для которого гораздо меньше работы - просто думая о данных.Речь идет о предварительных вычислениях и обработке, о том, что некоторые виды массажа могут превзойти методы грубой силы.Ваша точка зрения на JOIN менее важна, так как сейчас это ограниченное JOIN, а не массовая генерация данных.

Ваш комментарий к подходу с фреймами данных немного искажен тем, что все, что здесь превзошло, - это фреймы данных.Я думаю, что вы имеете в виду, что вы хотите перебрать фрейм данных и получить суб-цикл с выходом.Я не могу найти такого примера, и на самом деле я не уверен, что он соответствует парадигме SPARK.Получены те же результаты, но с меньшей обработкой:

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.Window

val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
@transient val  w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")  

val ids = df.where("flag = false") 
            .select($"id".as("id1"))  

val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")

// Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
// Some understanding of data required! And no grouping and min.
val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
                     .select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
                     .orderBy(asc("id"),asc("id"))

withNextFalse.show(false)

также возвращает:

+---+-----+------------------+
|id |flag |nextOrCurrentFalse|
+---+-----+------------------+
|0  |true |2                 |
|1  |true |2                 |
|2  |false|2                 |
|3  |true |6                 |
|4  |true |6                 |
|5  |true |6                 |
|6  |false|6                 |
|7  |false|7                 |
|8  |true |9                 |
|9  |false|9                 |
+---+-----+------------------+
0 голосов
/ 20 ноября 2018

Если flag довольно редко, вы можете сделать это следующим образом:

val ids = df.where("flag = false"). 
             select($"id".as("id1"))  

val withNextFalse = df.join(ids, df("id") <= ids("id1")).
                      groupBy("id", "flag").
                      agg("id1" -> "min")

На первом шаге мы создадим фрейм данных идентификаторов, где флаг равен false.Затем мы присоединяем этот фрейм данных к исходным данным в желаемом условии (исходный идентификатор должен быть меньше или равен идентификатору строки, в которой установлен флаг false).

Чтобы получить first в таком случае, сгруппируйте по идентификатору и используйте agg, чтобы найти минимальное значение id1 (которое является идентификатором строки с флагом = false.

Запуск по данным вашего примера (и сортировка поid) дает желаемый результат:

+---+-----+--------+
| id| flag|min(id1)|
+---+-----+--------+
|  0| true|       2|
|  1| true|       2|
|  2|false|       2|
|  3| true|       6|
|  4| true|       6|
|  5| true|       6|
|  6|false|       6|
|  7|false|       7|
|  8| true|       9|
|  9|false|       9|
+---+-----+--------+

Этот подход может привести к проблемам с производительностью, если DataFrame очень большой и имеет много строк, где флаг имеет значение False. Если это так, вам может быть лучше ситерационное решение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...