предполагается, что каждая строка в наборе данных имеет формат json строковый формат
import pyspark.sql.functions as F
def drop_null_cols(data):
import json
content = json.loads(data)
for key, value in list(content.items()):
if value is None:
del content[key]
return json.dumps(content)
drop_null_cols_udf = F.udf(drop_null_cols, F.StringType())
df = spark.createDataFrame(
["{\"name\":\"Ranga\", \"age\":25, \"city\":\"Hyderabad\"}",
"{\"name\":\"John\", \"age\":null, \"city\":\"New York\"}",
"{\"name\":null, \"age\":31, \"city\":\"London\"}"],
"string"
).toDF("data")
df.select(
drop_null_cols_udf("data").alias("data")
).show(10,False)
Если входной фрейм данных имеет столбцы, а выходные данные должны быть не пустыми столбцами json
df = spark.createDataFrame(
[('Ranga', 25, 'Hyderabad'),
('John', None, 'New York'),
(None, 31, 'London'),
],
['name', 'age', 'city']
)
df.withColumn(
"data", F.to_json(F.struct([x for x in df.columns]))
).select(
drop_null_cols_udf("data").alias("data")
).show(10, False)
#df.write.format("csv").save("s3://path/to/file/) -- save to s3
, что дает
+-------------------------------------------------+
|data |
+-------------------------------------------------+
|{"name": "Ranga", "age": 25, "city": "Hyderabad"}|
|{"name": "John", "city": "New York"} |
|{"age": 31, "city": "London"} |
+-------------------------------------------------+