Как преобразовать массив строк в массив struct с условиями - PullRequest
1 голос
/ 09 февраля 2020

У меня есть фрейм данных pyspark с одним столбцом _c0.

a|b|c|clm4=1|clm5=3
a|b|c|clm4=9|clm6=60|clm7=23

Я пытаюсь преобразовать его в фрейм данных выбранных столбцов, например

clm1,clm2,clm3,clm4,clm6,clm7,clm8
a,    b,   c,   1,  null,null,null
a,    b,   c,   9,   60,  23, null

Обратите внимание, что я удалил clm5 и добавил clm8.

Я использую следующий код:

transform_expr = """
    transform(split(_c0, '[|]'), (x, i) -> 
                       struct(
                             IF(x like '%=%', substring_index(x, '=', 1), concat('_c0', i+1)), 
                             substring_index(x, '=', -1)
                             )
            )
    """


    df = df.select("_c0",  explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")).groupby("_c0").pivot('col_name').agg(first('col_value')).drop("_c0")  

Проблема в том, что у меня есть несколько огромных файлов, над которыми я хочу выполнить это действие, и результат каждого файла должен содержат те же столбцы (которые также являются длинным списком), которые могут иметь нулевые значения, если они отсутствуют во входном файле. Как добавить условие в приведенный выше код, чтобы выбрать только те столбцы, которые присутствуют в списке имен столбцов?

1 Ответ

1 голос
/ 09 февраля 2020

Вы можете иметь нужные столбцы в списке и использовать их для фильтрации преобразованного массива:

column_list = ["clm1", "clm2", "clm3", "clm4", "clm6", "clm7", "clm8"]

Теперь добавьте этот фильтр после шага преобразования, используя функцию filter:

column_filter = ','.join(f"'{c}'" for c in column_list)

transform_expr = f"""
            filter(transform(split(_c0, '[|]'), (x, i) -> 
                               struct(
                                     IF(x like '%=%', substring_index(x, '=', 1), concat('clm', i+1)) as name, 
                                     substring_index(x, '=', -1) as value
                                     )
                    ), x -> x.name in ({column_filter}))
            """

Это отфильтрует все столбцы, которых нет в списке.

И, наконец, добавьте отсутствующие столбцы как пустые, используя выражение простого выбора:

df = df.select("_c0",  explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")).groupby("_c0").pivot('col_name').agg(first('col_value')).drop("_c0")

## add missing columns as nulls
final_columns = [col(c).alias(c) if c in df.columns else lit(None).alias(c) for c in column_list]

df.select(*final_columns).show()

#+----+----+----+----+----+----+----+
#|clm1|clm2|clm3|clm4|clm6|clm7|clm8|
#+----+----+----+----+----+----+----+
#|   a|   b|   c|   9|  60|  23|null|
#|   a|   b|   c|   1|null|null|null|
#+----+----+----+----+----+----+----+
...