Разнесите массив структур в столбцы в pyspark - PullRequest
0 голосов
/ 12 октября 2019

Я бы хотел разбить массив структур на столбцы (как определено в полях структуры). Например,

    root
 |-- news_style_super: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- sbox_ctr: double (nullable = true)
 |    |    |    |-- wise_ctr: double (nullable = true)

Должен быть преобразован в

|-- name: string (nullable = true)
|-- sbox_ctr: double (nullable = true)
|-- wise_ctr: double (nullable = true)

Как я могу это сделать?

Ответы [ 2 ]

0 голосов
/ 15 октября 2019
def get_final_dataframe(pathname, df):
cur_names = pathname.split(".")
if len(cur_names) > 1:
    root_name = cur_names[0]
    delimiter = "."
    new_path_name = delimiter.join(cur_names[1:len(cur_names)])

    for field in df.schema.fields:
        if field.name == root_name:
            if type(field.dataType) == ArrayType:
                return get_final_dataframe(pathname, df.select(explode(root_name).alias(root_name)))
            elif type(field.dataType) == StructType:
                if hasColumn(df, delimiter.join(cur_names[0:2])):
                    return get_final_dataframe(new_path_name, df.select(delimiter.join(cur_names[0:2])))
                else:
                    return -1, -1
            else:
                return -1, -1

else:
    root_name = cur_names[0]
    for field in df.schema.fields:
        if field.name == root_name:
            if type(field.dataType) == StringType:
                return df, "string"
            elif type(field.dataType) == LongType:
                return df, "numeric"
            elif type(field.dataType) == DoubleType:
                return df, "numeric"
            else:
                return df, -1

return -1, -1

тогда вы можете

key = "a.b.c.name"
# key = "context.content_feature.tag.name"
df2, field_type = get_final_dataframe(key, df1)
0 голосов
/ 12 октября 2019

, когда я делаю это, он не может работать

 dff = df1.select("context.content_feature.news_style_super")
 print dff.printSchema()
 df2 = dff.select(explode("name").alias("tmp")) .select("tmp.*")

Traceback (последний вызов был последним): файл "/tmp/zeppelin_pyspark-1808705980431719035.py", строка 367, в повышении Exception (traceback. format_exc ()) Исключение: трассировка (последний вызов был последним): файл "/tmp/zeppelin_pyspark-1808705980431719035.py", строка 355, в файле exec (code, _zcUserQueryNameSpace) "", строка 141, в файле "/ home / work/lxc/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/dataframe.py ", строка 984, в файле выбора jdf = self._jdf.select (файл self._jcols (* cols))" / home / work /lxc / интерпретатор / spark / pyspark / py4j-0.10.4-src.zip / py4j / java_gateway.py ", строка 1133, в вызов ответ, self.gateway_client, self.target_id, self.name)Файл "/home/work/lxc/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/utils.py", строка 69, в деко-рейсе AnalysisException (s.split (':', 1) [1],stackTrace) AnalysisException: вы "не можете разрешить" name 'заданные входные столбцы: [news_style_super] ;; \ n'Project [explode (' name) AS tmp # 1559] \ n + - Project [context # 1411.content_feature.news_style_super AS news_style_super # 1556] \ n + - GlobalLimit 1 \ n + - LocalLimit 1 \ n + - Отношение [контекст # 1411, gr_context # 1412, request_feature # 1413, sequence_feature # 1414, session_feature # 1415, sv_session_user_feature # 1417, user_recommend_feature # 1418, vertical_user_feature # 1419] json \ n "

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...