Итерация строк данных pyspark и применение UDF - PullRequest
0 голосов
/ 25 июня 2019

У меня есть датафрейм, который выглядит так: partitionCol orderCol valueCol

+--------------+----------+----------+
| partitionCol | orderCol | valueCol |
+--------------+----------+----------+
| A            | 1        | 201      |
| A            | 2        | 645      |
| A            | 3        | 302      |
| B            | 1        | 335      |
| B            | 2        | 834      |
+--------------+----------+----------+

Я хочу сгруппировать по partitionCol, а затем внутри каждого раздела перебрать строки, упорядоченные по orderCol, и применить некоторую функцию для вычисления нового столбца на основе valueCol и кэшированного значения. например,

def foo(col_value, cached_value):
    tmp = <some value based on a condition between col_value and cached_value>
    <update the cached_value using some logic>
    return tmp

Я понимаю, что мне нужно сгруппировать partitionCol и применить UDF, который будет работать с каждым каналом в отдельности, но изо всех сил пытается найти хороший способ перебирать строки и применять логику, которую я описал, чтобы получить желаемый результат:

+--------------+----------+----------+---------------+
| partitionCol | orderCol | valueCol | calculatedCol -
+--------------+----------+----------+---------------+
| A            | 1        | 201      | C1            |
| A            | 2        | 645      | C1            |
| A            | 3        | 302      | C2            |
| B            | 1        | 335      | C1            |
| B            | 2        | 834      | C2            |
+--------------+----------+----------+---------------+

1 Ответ

0 голосов
/ 25 июня 2019

Я думаю, что лучший способ сделать это - применить UDF ко всему набору данных:

# first, you create a struct with the order col and the valu col
df = df.withColumn("my_data", F.struct(F.col('orderCol'), F.col('valueCol'))

# then you create an array of that new column 
df = df.groupBy("partitionCol").agg(F.collect_list('my_data').alias("my_data")

# finaly, you apply your function on that array
df = df.withColumn("calculatedCol", my_udf(F.col("my_data"))

Но не зная точно, что вы хотите сделать, это все, что я могу предложить,

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...