Заполнить нулевые значения следующим увеличивающимся числом | PySpark | Python - PullRequest
1 голос
/ 26 февраля 2020

У меня ниже dataframe со значениями. Я хочу добавить следующий идентификатор в столбце, который должен быть уникальным, а также увеличивающимся по природе.

+----------------+----+--------------------+
|local_student_id|  id|        last_updated|
+----------------+----+--------------------+
|          610931|null|                null|
|          599768|   3|2020-02-26 15:47:...|
|          633719|null|                null|
|          612949|   2|2020-02-26 15:47:...|
|          591819|   1|2020-02-26 15:47:...|
|          595539|   4|2020-02-26 15:47:...|
|          423287|null|                null|
|          641322|   5|2020-02-26 15:47:...|
+----------------+----+--------------------+

Я хочу ниже ожидаемого результата. кто-нибудь может меня обуздать? Я новичок в Pyspark. и также хочу добавить текущую метку времени в столбце last_updated.

+----------------+----+--------------------+
|local_student_id|  id|        last_updated|
+----------------+----+--------------------+
|          610931|   6|2020-02-26 16:00:...|
|          599768|   3|2020-02-26 15:47:...|
|          633719|   7|2020-02-26 16:00:...|
|          612949|   2|2020-02-26 15:47:...|
|          591819|   1|2020-02-26 15:47:...|
|          595539|   4|2020-02-26 15:47:...|
|          423287|   8|2020-02-26 16:00:...|
|          641322|   5|2020-02-26 15:47:...|
+----------------+----+--------------------+

на самом деле я пытался

final_data = final_data.withColumn(
        'id', when(col('id').isNull(), row_number() + max(col('id'))).otherwise(col('id')))

, но выдает следующую ошибку: -

: org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and '`local_student_id`' is not an aggregate function. Wrap '(CASE WHEN (`id` IS NULL) THEN (CAST(row_number() AS BIGINT) + max(`id`)) ELSE `id` END AS `id`)' in windowing function(s) or wrap '`local_student_id`' in first() (or first_value) if you don't care which value you get.;;

1 Ответ

3 голосов
/ 26 февраля 2020

вот код, который вам нужен:

from pyspark.sql import functions as F, Window

max_id = final_data.groupBy().max("id").collect()[0][0]

final_data.withColumn(
    "id",
    F.coalesce(
        F.col("id"),
        F.row_number().over(Window.orderBy("id")) + F.lit(max_id)
    )
).withColumn(
    "last_updated",
    F.coalesce(
        F.col("last_updated"),
        F.current_timestamp()
    )
)

...