проблема переноса данных в pyspark - PullRequest
0 голосов
/ 11 апреля 2020

У меня есть следующий входной фрейм данных с соответствующими столбцами:

dim1,dim2,tran_clmn,input1,input2
101,201,Y1,1,2
102,202,Y2,2,3
103,203,Y3,3,4
104,204,Y4,4,5
105,205,Y5,5,6

Мне нужно транспонировать входные данные в приведенный ниже транспонированный вывод на основе столбца tran_clmn

dim1,dim2,new_trn_clm,Y1,Y2,Y3,Y4,Y5
101,201,input1,1,,,,
101,201,input2,2,,,,
102,202,input1,,2,,,
102,202,input2,,3,,,
103,203,input1,,,3,,
103,203,input2,,,4,,
104,204,input1,,,,4,
104,204,input2,,,,5,
105,205,input1,,,,,5
105,205,input2,,,,,6

как добиться такого сценария? нет возможности агрегирования. можно ли это сделать и получить результат, используя groupBy и метод pivot ?

1 Ответ

0 голосов
/ 11 апреля 2020

Два года go, я нашел в этой записи функцию, которая выполняет такое преобразование (от широкого к длинному). Это было очень полезно для меня: может быть, это может помочь и вам.

import pyspark.sql.functions as psf

def melt(df: DataFrame,
         id_vars: Iterable[str], value_vars: Iterable[str],
         var_name: str = "variable", value_name: str = "value"):
    """
    Convert :class:`DataFrame` from wide to long format.
    """
    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = psf.array(*(
        psf.struct(psf.lit(c).alias(var_name), psf.col(c).alias(value_name))
        for c in value_vars))
    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", psf.explode(_vars_and_vals))
    cols = id_vars + [
        psf.col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

В вашем случае это может быть что-то вроде этого (не проверено):

melt(df, id_vars=['dim1','dim2'],
                       value_vars=[input1','input2'],
                       var_name="tran_clmn", value_name="tran_clmn")
...