Создание нового столбца с использованием сложных условных и отстающих собственных ссылок в Spark - PullRequest
2 голосов
/ 29 октября 2019

Я пытаюсь создать новый столбец в моем кадре данных Spark на основе:

  1. предыдущего значения этого столбца (т.е. нового значенияв столбце основаны на значениях над ним, которые, в свою очередь, основаны на ...)

  2. очень сложное условное утверждение (24 различных условия) в зависимости от значений других столбцов(и запаздывающее значение самой переменной)

Например, что-то вроде логики в этом цикле:

for row, i in df:
    if row.col1 == "a":
        row.col4 = row.col1 + row.col3
        row.col5 = 11
    if row.col1 == "b":
        if row.col3 == 1:
            row.col4 = lag(row.col4) + row.col1 + row.col2
            row.col5 = 14
        if row.col3 == 0:
            row.col4 = lag(row.col4) + row.col1 + row.col3)
            row.col5 = 17
    if row.col1 == "d":
        if row.col3 == 1:
            row.col4 = 99
            row.col5 = 19
    if lag(row.col4) == 99:
        row.col4 = lag(row.col4) + row.col5
        row.col5 = etc...

(... плюс другое21 возможное значение c и d)

Пример

Я хочу преобразовать это:

w = Window.orderBy(col("col1").asc())

df = spark.createDataFrame([
    ("a", 2, 0),
    ("b", 3, 1),
    ("b", 4, 0),
    ("d", 5, 1),
    ("e", 6, 0),
    ("f", 7, 1)
], ["col1", "col2","col3"])

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   a|   2|   0|
|   b|   3|   1|
|   b|   4|   0|
|   d|   5|   1|
|   e|   6|   0|
|   f|   7|   1|
+----+----+----+

... в это:

+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
|col1|col2|col3|col4    >(explanation)                                        |col5 >(also uses complex logic)  |
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
|   a|   2|   0|a0      >(because (col1==a) ==> col1+col3)                    |11   >                           |
|   b|   3|   1|a0b3    >(because (col1==b & col3==1) ==> lag(col4)+col1+col2)|14   >                           |
|   b|   4|   0|a0b3b0  >(because (col1==b & col3==0) ==> lag(col4)+col1+col3)|17   >                           |
|   d|   5|   1|99      >(because (col1==d) ==> 99)                           |19   >                           |
|   e|   6|   0|9919    >(because (lag(col4)==99) ==> lag(col4)+col5          |e6   >                           |
|   f|   7|   1|etc...  >etc...                                               |etc..>etc...                     |
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+

Это вообще возможно в Spark? Ничто из того, что я пробовал, не сработало:

  • Я не нашел способа передать выходные данные UDF обратно в следующий расчет UDF
  • Условное + самоссылка делаетсохранение предыдущих значений во временных столбцах в принципе невозможно.
  • Я пытался использовать гигантские предложения when, но меня смущали ссылки на лаговые значения самого столбца в операторе withColumn(). Другая проблема с подходом when() + lag() состоит в том, что другие переменные ссылаются на переменную с задержкой, а переменная с задержкой ссылается на другие переменные. (другими словами, в каждую строку подается только одно значение с запаздыванием, но это значение по-разному взаимодействует с другими переменными в зависимости от условий, встречающихся в этой строке.

1 Ответ

3 голосов
/ 03 ноября 2019

Если вы в порядке с UDF, тогда все просто (я просто скопировал ваши условия в приведенный ниже код). Для решения без UDF это зависит от того, как столбцы lag отображаются в условии if, если только вы не можете предоставить примеры с более или самым сложным условием if, я бы сказал, что UDF - это самый простой способ,

примечание: следующее работает только тогда, когда данные могут быть правильно разделены, например, должен быть один или несколько столбцов, которые можно использовать для группировки, и, таким образом, все связанные строки всеобрабатываться в одном разделе.

from pyspark.sql.functions import udf, lit, collect_list, struct

@udf('array<struct<col1:string,col2:int,col3:int,col4:string,col5:string>>')
def gen_col(rows):
  new_rows = []
  for row in sorted(rows, key=lambda x: x.col2):
    if row.col1 == 'a':
        col4 = row.col1 + str(row.col3)
        col5 = '11'
    elif row.col1 == "b":
        if row.col3 == 1:
            col4 = col4 + row.col1 + str(row.col2)
            col5 = '14'
        if row.col3 == 0:
            col4 = col4 + row.col1 + str(row.col3)
            col5 = '17'
    elif row.col1 == "d":
        if row.col3 == 1:
            col4 = '99'
            col5 = '19'
    elif col4 == '99':
        col4 = col4 + col5
        col5 = row.col1 + str(row.col2)
    else:
        col4 = None
        col5 = None
    new_rows.append(dict(col4=col4, col5=col5, **row.asDict()))
  return new_rows

df.groupby(lit(1)) \
  .agg(gen_col(collect_list(struct(df.columns))).alias('new')) \
  .selectExpr('inline(new)') \
  .show()
+----+----+----+------+----+
|col1|col2|col3|  col4|col5|
+----+----+----+------+----+
|   a|   2|   0|    a0|  11|
|   b|   3|   1|  a0b3|  14|
|   b|   4|   0|a0b3b0|  17|
|   d|   5|   1|    99|  19|
|   e|   6|   0|  9919|  e6|
|   f|   7|   1|  null|null|
+----+----+----+------+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...