Вы можете просто изменить ответ на вопрос, который вы связали, используя pyspark.sql.functions.when
.
Рассмотрите следующий пример DataFrame:
data = [
('one', 1, 10),
(None, 2, 20),
('three', None, 30),
(None, None, 40)
]
sdf = spark.createDataFrame(data, ["A", "B", "C"])
sdf.printSchema()
#root
# |-- A: string (nullable = true)
# |-- B: long (nullable = true)
# |-- C: long (nullable = true)
Использованиеwhen
для реализации логики if-then-else .Используйте столбец, если он не нулевой.В противном случае вернуть пустую строку.
from pyspark.sql.functions import col, to_json, struct, when, lit
sdf = sdf.withColumn(
"JSON",
to_json(
struct(
[
when(
col(x).isNotNull(),
col(x)
).otherwise(lit("")).alias(x)
for x in sdf.columns
]
)
)
)
sdf.show()
#+-----+----+---+-----------------------------+
#|A |B |C |JSON |
#+-----+----+---+-----------------------------+
#|one |1 |10 |{"A":"one","B":"1","C":"10"} |
#|null |2 |20 |{"A":"","B":"2","C":"20"} |
#|three|null|30 |{"A":"three","B":"","C":"30"}|
#|null |null|40 |{"A":"","B":"","C":"40"} |
#+-----+----+---+-----------------------------+
Другой вариант - использовать pyspark.sql.functions.coalesce
вместо when
:
from pyspark.sql.functions import coalesce
sdf.withColumn(
"JSON",
to_json(
struct(
[coalesce(col(x), lit("")).alias(x) for x in sdf.columns]
)
)
).show(truncate=False)
## Same as above