Я пытаюсь создать новый столбец в моем кадре данных Spark на основе:
предыдущего значения этого столбца (т.е. нового значенияв столбце основаны на значениях над ним, которые, в свою очередь, основаны на ...)
очень сложное условное утверждение (24 различных условия) в зависимости от значений других столбцов(и запаздывающее значение самой переменной)
Например, что-то вроде логики в этом цикле:
for row, i in df:
if row.col1 == "a":
row.col4 = row.col1 + row.col3
row.col5 = 11
if row.col1 == "b":
if row.col3 == 1:
row.col4 = lag(row.col4) + row.col1 + row.col2
row.col5 = 14
if row.col3 == 0:
row.col4 = lag(row.col4) + row.col1 + row.col3)
row.col5 = 17
if row.col1 == "d":
if row.col3 == 1:
row.col4 = 99
row.col5 = 19
if lag(row.col4) == 99:
row.col4 = lag(row.col4) + row.col5
row.col5 = etc...
(... плюс другое21 возможное значение c
и d
)
Пример
Я хочу преобразовать это:
w = Window.orderBy(col("col1").asc())
df = spark.createDataFrame([
("a", 2, 0),
("b", 3, 1),
("b", 4, 0),
("d", 5, 1),
("e", 6, 0),
("f", 7, 1)
], ["col1", "col2","col3"])
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| 2| 0|
| b| 3| 1|
| b| 4| 0|
| d| 5| 1|
| e| 6| 0|
| f| 7| 1|
+----+----+----+
... в это:
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
|col1|col2|col3|col4 >(explanation) |col5 >(also uses complex logic) |
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
| a| 2| 0|a0 >(because (col1==a) ==> col1+col3) |11 > |
| b| 3| 1|a0b3 >(because (col1==b & col3==1) ==> lag(col4)+col1+col2)|14 > |
| b| 4| 0|a0b3b0 >(because (col1==b & col3==0) ==> lag(col4)+col1+col3)|17 > |
| d| 5| 1|99 >(because (col1==d) ==> 99) |19 > |
| e| 6| 0|9919 >(because (lag(col4)==99) ==> lag(col4)+col5 |e6 > |
| f| 7| 1|etc... >etc... |etc..>etc... |
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
Это вообще возможно в Spark? Ничто из того, что я пробовал, не сработало:
- Я не нашел способа передать выходные данные UDF обратно в следующий расчет UDF
- Условное + самоссылка делаетсохранение предыдущих значений во временных столбцах в принципе невозможно.
- Я пытался использовать гигантские предложения
when
, но меня смущали ссылки на лаговые значения самого столбца в операторе withColumn()
. Другая проблема с подходом when()
+ lag()
состоит в том, что другие переменные ссылаются на переменную с задержкой, а переменная с задержкой ссылается на другие переменные. (другими словами, в каждую строку подается только одно значение с запаздыванием, но это значение по-разному взаимодействует с другими переменными в зависимости от условий, встречающихся в этой строке.