Pyspark конвертировать строки в JSON с нулями - PullRequest
0 голосов
/ 28 ноября 2018

Цель: Для кадра данных со схемой

id:string
Cold:string
Medium:string
Hot:string
IsNull:string
annual_sales_c:string
average_check_c:string
credit_rating_c:string
cuisine_c:string
dayparts_c:string
location_name_c:string
market_category_c:string
market_segment_list_c:string
menu_items_c:string
msa_name_c:string
name:string
number_of_employees_c:string
number_of_rooms_c:string
Months In Role:integer
Tenured Status:string
IsCustomer:integer
units_c:string
years_in_business_c:string
medium_interactions_c:string
hot_interactions_c:string
cold_interactions_c:string
is_null_interactions_c:string

Я хочу добавить новый столбец, представляющий собой строку JSON всех ключей и значений для столбцов.В этом посте я использовал подход PySpark - конвертировать в JSON построчно и связанные с этим вопросы.Мой код

df = df.withColumn("JSON",func.to_json(func.struct([df[x] for x in small_df.columns])))

У меня есть одна проблема:

Проблема: Когда в любой строке есть нулевое значение для столбца (а в моих данных много ...) строка Json не содержит ключа.Т.е. если только 9 из 27 столбцов имеют значения, то строка JSON имеет только 9 ключей ... Я хотел бы сохранить все ключи, но для значений NULL просто передать пустую строку ""

Любые советы?

1 Ответ

0 голосов
/ 28 ноября 2018

Вы можете просто изменить ответ на вопрос, который вы связали, используя pyspark.sql.functions.when.

Рассмотрите следующий пример DataFrame:

data = [
    ('one', 1, 10),
    (None, 2, 20),
    ('three', None, 30),
    (None, None, 40)
]

sdf = spark.createDataFrame(data, ["A", "B", "C"])
sdf.printSchema()
#root
# |-- A: string (nullable = true)
# |-- B: long (nullable = true)
# |-- C: long (nullable = true)

Использованиеwhen для реализации логики if-then-else .Используйте столбец, если он не нулевой.В противном случае вернуть пустую строку.

from pyspark.sql.functions import col, to_json, struct, when, lit
sdf = sdf.withColumn(
    "JSON",
    to_json(
        struct(
           [
                when(
                    col(x).isNotNull(),
                    col(x)
                ).otherwise(lit("")).alias(x) 
                for x in sdf.columns
            ]
        )
    )
)
sdf.show()
#+-----+----+---+-----------------------------+
#|A    |B   |C  |JSON                         |
#+-----+----+---+-----------------------------+
#|one  |1   |10 |{"A":"one","B":"1","C":"10"} |
#|null |2   |20 |{"A":"","B":"2","C":"20"}    |
#|three|null|30 |{"A":"three","B":"","C":"30"}|
#|null |null|40 |{"A":"","B":"","C":"40"}     |
#+-----+----+---+-----------------------------+

Другой вариант - использовать pyspark.sql.functions.coalesce вместо when:

from pyspark.sql.functions import coalesce

sdf.withColumn(
    "JSON",
    to_json(
        struct(
           [coalesce(col(x), lit("")).alias(x) for x in sdf.columns]
        )
    )
).show(truncate=False)
## Same as above
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...