Как разделить столбец искровых данных в ArrayType (StructType) на несколько столбцов в pyspark? - PullRequest
1 голос
/ 08 ноября 2019

Я читаю xml с использованием блоков данных spark spark xml со схемой ниже. подэлемент X_PAT может встречаться более одного раза, для обработки этого я использовал тип массива (structtype), преобразование ne xt для создания нескольких столбцов из этого одного столбца.

<root_tag>
   <id>fff9</id>
   <X1000>
      <X_PAT>
         <X_PAT01>IC</X_PAT01>
         <X_PAT02>EDISUPPORT</X_PAT02>
         <X_PAT03>TE</X_PAT03>
      </X_PAT>
      <X_PAT>
         <X_PAT01>IC1</X_PAT01>
         <X_PAT02>EDISUPPORT1</X_PAT02>
         <X_PAT03>TE1</X_PAT03>
      </X_PAT>
   </X1000>
</root_tag>
from pyspark.sql import SparkSession
from pyspark.sql.types import *

jar_path = "/Users/nsrinivas/com.databricks_spark-xml_2.10-0.4.1.jar"

spark = SparkSession.builder.appName("Spark - XML read").master("local[*]") \
    .config("spark.jars", jar_path) \
    .config("spark.executor.extraClassPath", jar_path) \
    .config("spark.executor.extraLibrary", jar_path) \
    .config("spark.driver.extraClassPath", jar_path) \
    .getOrCreate()

xml_schema = StructType()
xml_schema.add("id", StringType(), True)
x1000 = StructType([
    StructField("X_PAT",
                ArrayType(StructType([
                    StructField("X_PAT01", StringType()),
                    StructField("X_PAT02", StringType()),
                    StructField("X_PAT03", StringType())]))),
])
xml_schema.add("X1000", x1000, True)

df = spark.read.format("xml").option("rowTag", "root_tag").option("valueTag", False) \
    .load("root_tag.xml", schema=xml_schema)

df.select("id", "X1000.X_PAT").show(truncate=False)

Я получаюВыведите, как показано ниже:

+------------+--------------------------------------------+
|id          |X_PAT                                       |
+------------+--------------------------------------------+
|fff9        |[[IC1, SUPPORT1, TE1], [IC2, SUPPORT2, TE2]]|
+------------+--------------------------------------------+

, но я хочу, чтобы X_PAT был сплющен и создал несколько столбцов, как показано ниже, тогда я переименую столбцы.

+-----+-------+-------+-------+-------+-------+-------+
|id   |X_PAT01|X_PAT02|X_PAT03|X_PAT01|X_PAT02|X_PAT03|
+-----+-------+-------+-------+-------+-------+-------+
|fff9 |IC1    |SUPPORT1|TE1   |IC2   |SUPPORT2|TE2    |
+-----+-------+-------+-------+-------+-------+-------+

тогда я переименую новые столбцыкак показано ниже

id|XPAT_1_01|XPAT_1_02|XPAT_1_03|XPAT_2_01|XPAT_2_02|XPAT_2_03|

Я пытался использовать X1000.X_PAT.*, но он выдает ошибку ниже pyspark.sql.utils.AnalysisException: «Может расширять только типы данных struct star. Атрибут: ArrayBuffer(L_1000A, S_PER); '

Есть идеи, пожалуйста?

1 Ответ

0 голосов
/ 08 ноября 2019

Попробуйте это:

df = spark.createDataFrame([('1',[['IC1', 'SUPPORT1', 'TE1'],['IC2', 'SUPPORT2', 'TE2']]),('2',[['IC1', 'SUPPORT1', 'TE1'],['IC2','SUPPORT2', 'TE2']])],['id','X_PAT01'])

enter image description here

Определить функцию для анализа данных

def create_column(df):
    data = df.select('X_PAT01').collect()[0][0]
    for each_list in range(len(data)):
        for each_item in range(len(data[each_list])):
            df = df.withColumn('X_PAT_'+str(each_list)+'_0'+str(each_item), F.lit(data[each_list][each_item]))
    return df

вызов

df = create_column(df)

выход

enter image description here

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...