Итерация (l oop) на уровне пользователя в DF-кадре PySpark DF - PullRequest
0 голосов
/ 21 февраля 2020

Представьте, что у меня есть следующий DF в PySpark, где UB и LB означают верхнюю и нижнюю границу, соответственно.

+---------+-----+--------------+------+------+
| user_id | row | currentValue |  UB  |  LB  |
+---------+-----+--------------+------+------+
| usr001  |   1 |           12 | 7.2  | 16.8 |
| usr001  |   2 |           20 | 12   | 28   |
| usr001  |   3 |           17 | 10.2 | 23.8 |
| usr001  |   4 |           21 | 12.6 | 29.4 |
| usr001  |   5 |            9 | 5.4  | 12.6 |
| usr001  |   6 |           23 | 13.8 | 32.2 |
| usr002  |   1 |           11 | 6.6  | 15.4 |
| usr002  |   2 |           10 | 6    | 14   |
| usr002  |   3 |           15 | 9    | 21   |
| usr002  |   4 |            3 | 1.8  | 4.2  |
| usr002  |   5 |            4 | 2.4  | 5.6  |
+---------+-----+--------------+------+------+

Для каждого пользователя в DF я хотел бы применить некоторые логику / правила так что currentValue может быть обновлено до updatedValue. Логика / правила следующие:

user_id: usr001

  1. Для строки 1: currentValue = updatedValue (для всех пользователей)
  2. Для строки 2: если значение currentValue находится в пределах LB и UB в строке 1 (если 20 находится между 7,2 и 16,8), затем updatedValue в строке 2 равно currentValue в строке 1 (updatedValue в строке 2 = 12). Иначе, updatedValue = currentValue (updatedValue = 20)
  3. Поскольку в строке 2 updatedValue = currentValue, строка 3 сравнивается со строкой 2. Для строки 3: если currentValue находится в пределах LB и UB в строке 2 (если 17 между 12 и 28) затем updatedValue в строке 3 равно currentValue в строке 2 (updatedValue в строке 3 = 20). Иначе, updatedValue = currentValue (updatedValue = 17)
  4. Поскольку в строке 3 updatedValue в строке 3 = currentValue в строке 2, строка 4 сравнивается со строкой 2. Для строки 4: если currentValue находится в пределах LB и UB в строка 2 (если 21 находится между 12 и 28), тогда updatedValue в строке 4 равно currentValue в строке 2 (updatedValue в строке 4 = 20). Иначе, updatedValue = currentValue (updatedValue = 21)
  5. Поскольку в строке 4 updatedValue в строке 4 = currentValue в строке 2, строка 5 сравнивается со строкой 2. Для строки 5: если currentValue находится в пределах LB и UB в строка 2 (если 9 находится между 12 и 28), тогда updatedValue в строке 5 равно currentValue в строке 2 (updatedValue в строке 5 = 20). Иначе, updatedValue = currentValue (updatedValue = 9)
  6. Поскольку в строке 5 updatedValue = currentValue, строка 6 сравнивается со строкой 5. Для строки 6: если currentValue находится в пределах LB и UB в строке 5 (если 23 равно между 5.4 и 12.6) затем updatedValue в строке 6 равно currentValue в строке 2 (updatedValue в строке 5 = 20). Иначе, updatedValue = currentValue (updatedValue = 9)

Точные правила будут применяться для usr002. Ожидаемый результат будет следующим:

+---------+-----+--------------+------+------+--------------+
| user_id | row | currentValue |  UB  |  LB  | updatedValue |
+---------+-----+--------------+------+------+--------------+
| usr001  |   1 |           12 | 7.2  | 16.8 |           12 |
| usr001  |   2 |           20 | 12   | 28   |           20 |
| usr001  |   3 |           17 | 10.2 | 23.8 |           20 |
| usr001  |   4 |           21 | 12.6 | 29.4 |           20 |
| usr001  |   5 |            9 | 5.4  | 12.6 |            9 |
| usr001  |   6 |           23 | 13.8 | 32.2 |           23 |
| usr002  |   1 |           11 | 6.6  | 15.4 |           11 |
| usr002  |   2 |           10 | 6    | 14   |           11 |
| usr002  |   3 |           15 | 9    | 21   |           11 |
| usr002  |   4 |            3 | 1.8  | 4.2  |            3 |
| usr002  |   5 |            4 | 2.4  | 5.6  |            3 |
+---------+-----+--------------+------+------+--------------+

Есть ли способ, которым это может быть реализовано в Spark? Я ценю любую помощь!

Spark: 2.4.4

1 Ответ

4 голосов
/ 22 февраля 2020

Вы можете использовать оконные функции. Но это не так просто. Вот пошаговое объяснение кода и логики c.

(в приведенных ниже кодах и объяснении значения uv и updatedValue не совпадают)

1.Читать df

df=spark.read.csv(path, header=True, inferSchema=True)

2. Укажите окно

w=Window.partitionBy("user_id").orderBy("row")

3.Создать столбец, который сравнивает текущее значение только с UB и LB предыдущей строки, если он находится в диапазоне, затем возвращает предыдущую строку currentValue, иначе возвращает ту же строку currentValue, давайте назовем этот столбец "uv"

df2=df.withColumn("uv",when(col("row")==1,col("currentValue"))
.when(col("currentValue").between(lag("UB",1).over(w),                                          
lag("LB",1).over(w)),lag("currentValue",1).over(w))
.otherwise(col("currentValue"))).orderBy("user_id")

df2:

+-------+---+------------+----+----+---+
|user_id|row|currentValue|  UB|  LB| uv|
+-------+---+------------+----+----+---+
| usr001|  1|          12| 7.2|16.8| 12|
| usr001|  2|          20|12.0|28.0| 20|
| usr001|  3|          17|10.2|23.8| 20|
| usr001|  4|          21|12.6|29.4| 17|
| usr001|  5|           9| 5.4|12.6|  9|
| usr001|  6|          23|13.8|32.2| 23|
| usr002|  1|          11| 6.6|15.4| 11|
| usr002|  2|          10| 6.0|14.0| 11|
| usr002|  3|          15| 9.0|21.0| 15|
| usr002|  4|           3| 1.8| 4.2|  3|
| usr002|  5|           4| 2.4| 5.6|  3|
+-------+---+------------+----+----+---+

4. Это основные логи c, согласно вашим логи c для строки 5 (usr001). Сначала мы должны проверить, заполнена ли строка 4 updatedValue row4 currentValue, если он заполнен, тогда сравните значение строки 5 с границами строки 4, иначе нам нужно перейти к строке, из которой строка 4 updatedValue заполнена, и сравнить с этими границами, чтобы реализовать это на предыдущем шаге, отметив все значения, где currentValue == uv .

df3=df2.withColumn("comp_row", when(col("currentValue")==col("uv"), col("row")))

df3:

+-------+---+------------+----+----+---+--------+
|user_id|row|currentValue|  UB|  LB| uv|comp_row|
+-------+---+------------+----+----+---+--------+
| usr001|  1|          12| 7.2|16.8| 12|       1|
| usr001|  2|          20|12.0|28.0| 20|       2|
| usr001|  3|          17|10.2|23.8| 20|    null|
| usr001|  4|          21|12.6|29.4| 17|    null|
| usr001|  5|           9| 5.4|12.6|  9|       5|
| usr001|  6|          23|13.8|32.2| 23|       6|
| usr002|  1|          11| 6.6|15.4| 11|       1|
| usr002|  2|          10| 6.0|14.0| 11|    null|
| usr002|  3|          15| 9.0|21.0| 15|       3|
| usr002|  4|           3| 1.8| 4.2|  3|       4|
| usr002|  5|           4| 2.4| 5.6|  3|    null|
+-------+---+------------+----+----+---+--------+

5. Теперь, если мы заполним нулевые значения каждой строки, мы получим номер строки, с которой должна сравниваться каждая строка.

df4 = df3.withColumn("comp_row",last("comp_row",True).over(w))

df4:

+-------+---+------------+----+----+---+--------+
|user_id|row|currentValue|  UB|  LB| uv|comp_row|
+-------+---+------------+----+----+---+--------+
| usr001|  1|          12| 7.2|16.8| 12|       1|
| usr001|  2|          20|12.0|28.0| 20|       2|
| usr001|  3|          17|10.2|23.8| 20|       2|
| usr001|  4|          21|12.6|29.4| 17|       2|
| usr001|  5|           9| 5.4|12.6|  9|       5|
| usr001|  6|          23|13.8|32.2| 23|       6|
| usr002|  1|          11| 6.6|15.4| 11|       1|
| usr002|  2|          10| 6.0|14.0| 11|       1|
| usr002|  3|          15| 9.0|21.0| 15|       3|
| usr002|  4|           3| 1.8| 4.2|  3|       4|
| usr002|  5|           4| 2.4| 5.6|  3|       4|
+-------+---+------------+----+----+---+--------+

Примечание: значения o f comp_row указывает, с какой строкой должна сравниваться следующая строка, например: строка 4 (usr001) comp_row содержит 2, это означает, что строка 5 сравнивается со строкой 2.

6. Теперь мы знаем, какая строка сравнивается с какой строкой, все нам нужно просто получить границы этих строк. Для этого нам нужно объединить строку с comp_row, чтобы мы могли получить границы строки 2 в строке 4.

df5 = df4.select("user_id",col("row").alias("comp_row"),
col("UB").alias("new_UB"),col("LB").alias("new_LB")
,col("currentValue").alias("new_currentValue")) 
# Note: Here row is selected as comp_row.

df6=df5.join(df4,["user_id","comp_row"],"inner").orderBy("user_id","row")

df6.select("user_id",
"UB","LB"
,"new_UB","new_LB"
,"currentValue","new_currentValue"
,"row","comp_row").show()

+-------+----+----+------+------+------------+----------------+---+--------+
|user_id|  UB|  LB|new_UB|new_LB|currentValue|new_currentValue|row|comp_row|
+-------+----+----+------+------+------------+----------------+---+--------+
| usr001| 7.2|16.8|   7.2|  16.8|          12|              12|  1|       1|
| usr001|12.0|28.0|  12.0|  28.0|          20|              20|  2|       2|
| usr001|10.2|23.8|  12.0|  28.0|          17|              20|  3|       2|
| usr001|12.6|29.4|  12.0|  28.0|          21|              20|  4|       2|
| usr001| 5.4|12.6|   5.4|  12.6|           9|               9|  5|       5|
| usr001|13.8|32.2|  13.8|  32.2|          23|              23|  6|       6|
| usr002| 6.6|15.4|   6.6|  15.4|          11|              11|  1|       1|
| usr002| 6.0|14.0|   6.6|  15.4|          10|              11|  2|       1|
| usr002| 9.0|21.0|   9.0|  21.0|          15|              15|  3|       3|
| usr002| 1.8| 4.2|   1.8|   4.2|           3|               3|  4|       4|
| usr002| 2.4| 5.6|   1.8|   4.2|           4|               3|  5|       4|
+-------+----+----+------+------+------------+----------------+---+--------+

7. Последний шаг и Boom !!, сравнить значения тока с новыми границами в предыдущая строка, если она находится в границах, тогда updatedValue = new_currentValue предыдущей строки, в противном случае updatedValue = currentValue той же строки.

df7=df6.withColumn("updatedValue",when(col("row")==1,col("currentValue"))\
.when(col("currentValue").between(lag("new_UB",1).over(w),                                               
lag("new_LB",1).over(w)),lag("new_currentValue",1).over(w))             
.otherwise(col("currentValue"))).orderBy("user_id")\
.select("user_id","currentValue","UB","LB","updatedValue")

df7:

+-------+------------+----+----+------------+
|user_id|currentValue|  UB|  LB|updatedValue|
+-------+------------+----+----+------------+
| usr001|          12| 7.2|16.8|          12|
| usr001|          20|12.0|28.0|          20|
| usr001|          17|10.2|23.8|          20|
| usr001|          21|12.6|29.4|          20|
| usr001|           9| 5.4|12.6|           9|
| usr001|          23|13.8|32.2|          23|
| usr002|          11| 6.6|15.4|          11|
| usr002|          10| 6.0|14.0|          11|
| usr002|          15| 9.0|21.0|          11|
| usr002|           3| 1.8| 4.2|           3|
| usr002|           4| 2.4| 5.6|           3|
+-------+------------+----+----+------------+
...