Применить логику для определенного столбца в кадре данных в искре - PullRequest
0 голосов
/ 29 августа 2018

У меня есть датафрейм, и он был импортирован из mysql

dataframe_mysql.show()
+----+---------+-------------------------------------------------------+
|  id|accountid|                                                xmldata|
+----+---------+-------------------------------------------------------+
|1001|    12346|<AccountSetup xmlns:xsi="test"><Customers test="test...|
|1002|    12346|<AccountSetup xmlns:xsi="test"><Customers test="test...|
|1003|    12346|<AccountSetup xmlns:xsi="test"><Customers test="test...|
|1004|    12347|<AccountSetup xmlns:xsi="test"><Customers test="test...|
+----+---------+-------------------------------------------------------+

В столбце xmldata есть теги xml внутри, мне нужно проанализировать его в структурированных данных в отдельном фрейме данных.

Ранее у меня был только один xml-файл в текстовом файле, и я загружал его в фрейм данных spark, используя "com.databricks.spark.xml"

 spark-shell --packages com.databricks:spark-xml_2.10:0.4.1, 
 com.databricks:spark-csv_2.10:1.5.0

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 val df = sqlContext.read.format("com.databricks.spark.xml")
 .option("rowTag","Account").load("mypath/Account.xml")

окончательный результат, который я получил как структурированный

df.show ()

 +----------+--------------------+--------------------+--------------+--------------------+-------+....
    |   AcctNbr|         AddlParties|           Addresses|ApplicationInd|       Beneficiaries|ClassCd|....
    +----------+--------------------+--------------------+--------------+--------------------+-------+....
    |AAAAAAAAAA|[[Securities Amer...|[WrappedArray([D,...|             T|[WrappedArray([11...|     35|....
    +----------+--------------------+--------------------+--------------+--------------------+-------+....

Пожалуйста, посоветуйте, как этого добиться, когда у меня есть контент xml внутри фрейма данных.

Ответы [ 2 ]

0 голосов
/ 20 сентября 2018

Я попробовал следующий запрос

val dff1 = Seq(
Data(1001, 12345, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers></AccountSetup>"),
Data(1002, 12345, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"b\">e</Customers></AccountSetup>"),
Data(1003, 12345, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"c\">f</Customers></AccountSetup>")
    ).toDF

    dff1.show()
    val reader = new XmlReader().withRowTag("AccountSetup")
    val xmlrdd = dff1.select("xmldata").map(a => a.getString(0)).rdd
    xmlrdd.toDF("newRowXml").show()
    val xmldf = reader.xmlRdd(sqlcontext, xmlrdd)
    xmldf.show()

Я получил вывод для dff1.show () и xmlrdd.toDF ("newRowXml"). Show ()

//dff1.show()
+----+---------+--------------------+
|  id|accountid|             xmldata|
+----+---------+--------------------+
|1001|    12345|<AccountSetup xml...|
|1002|    12345|<AccountSetup xml...|
|1003|    12345|<AccountSetup xml...|
+----+---------+--------------------+

xmlrdd.toDF("newRowXml").show()
+--------------------+
|           newRowXml|
+--------------------+
|<AccountSetup xml...|
|<AccountSetup xml...|
|<AccountSetup xml...|
+--------------------+
18/09/20 19:30:29 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
18/09/20 19:30:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/09/20 19:30:29 INFO MemoryStore: MemoryStore cleared
18/09/20 19:30:29 INFO BlockManager: BlockManager stopped
18/09/20 19:30:29 INFO BlockManagerMaster: BlockManagerMaster stopped
18/09/20 19:30:29 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/09/20 19:30:29 INFO SparkContext: Successfully stopped SparkContext
18/09/20 19:30:29 INFO ShutdownHookManager: Shutdown hook called
18/09/20 19:30:29 INFO ShutdownHookManager: Deleting directory C:\Users\rajkiranu\AppData\Local\Temp\spark-16433b5e-01b7-472b-9b88-fea0a67a991a

Process finished with exit code 1

не может видеть xmldf.show ()

0 голосов
/ 29 августа 2018

Поскольку вы пытаетесь вытянуть столбец данных XML в отдельный DataFrame, вы все равно можете использовать код из пакета spark-xml. Вам просто нужно использовать их читатель напрямую.

case class Data(id: Int, accountid: Int, xmldata: String)
val df = Seq(
    Data(1001, 12345, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers></AccountSetup>"),
    Data(1002, 12345, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"b\">e</Customers></AccountSetup>"),
    Data(1003, 12345, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"c\">f</Customers></AccountSetup>")
).toDF


import com.databricks.spark.xml.XmlReader

val reader = new XmlReader()

// Set options using methods
reader.withRowTag("AccountSetup")

val rdd = df.select("xmldata").map(r => r.getString(0)).rdd
val xmlDF = reader.xmlRdd(spark.sqlContext, rdd)

Однако UDF, как полагает филантроверт, с пользовательским синтаксическим анализом XML, вероятно, будет более чистым в долгосрочной перспективе. Ссылочная ссылка для читателя класса здесь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...