Как использовать Spark для копирования из XML на SQL Server - PullRequest
0 голосов
/ 27 марта 2019

Мне нужно открыть и скопировать содержимое нескольких файлов XML, хранящихся в хранилище данных Azure, в базу данных SQL Azure.Это структура файла XML:

<?xml version="1.0" encoding="utf-8"?>
<FileSummary xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:noNamespaceSchemaLocation="invoices.xsd">
      <Header>
      <SequenceNumber>1</SequenceNumber>
      <Description>Hello</Description>
      <ShipDate>20180101</ShipDate>
     </Header>
     <FileInvoices>
      <InvoiceNumber>000000A</InvoiceNumber>
      <InvoiceHeader>
       <InvoiceHeaderDate>201800201</InvoiceHeaderDate>
       <InvoiceHeaderDescription>XYZ</InvoiceHeaderDescription>
      </InvoiceHeader>
      <InvoiceItems>
       <ItemId>000001</ItemId>
       <ItemQuantity>000010</ItemQuantity>
       <ItemPrice>000100</ItemPrice>
      </InvoiceItems>
     </FileInvoices>
     <FileInvoices>
      <InvoiceNumber>000000B</InvoiceNumber>
      <InvoiceHeader>
       <InvoiceHeaderDate>201800301</InvoiceHeaderDate>
       <InvoiceHeaderDescription>ABC</InvoiceHeaderDescription>
      </InvoiceHeader>
      <InvoiceItems>
       <ItemId>000002</ItemId>
       <ItemQuantity>000020</ItemQuantity>
       <ItemPrice>000200</ItemPrice>
      </InvoiceItems>
     </FileInvoices>
</FileSummary>

Поэтому я использовал блоки данных Azure для монтирования хранилища данных как "/ mnt / testdata", а затем попытался открыть файл примера выше с помощью следующей команды

dfXml = (sqlContext.read.format("xml") # requires maven library <HyukjinKwon:spark-xml:0.1.1-s_2.11>
         .options(rootTag='FileSummary')
         .load('/mnt/testdata/data/invoices_file1.xml')) 
dfXml.cache()
print ("Number of records in this dataframe: " + str(dfXml.count())) 

dfXml.printSchema()

возвращает следующий результат:

dfXml:pyspark.sql.dataframe.DataFrame
FileInvoices:array
element:struct
InvoiceHeader:struct
InvoiceHeaderDate:long
InvoiceHeaderDescription:string
InvoiceItems:struct
ItemId:long
ItemPrice:long
ItemQuantity:long
InvoiceNumber:string
Header:struct
Description:string
SequenceNumber:long
ShipDate:long
xmlns:xsi:string
xsi:noNamespaceSchemaLocation:string
Number of records in this dataframe: 1
root
 |-- FileInvoices: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- InvoiceHeader: struct (nullable = true)
 |    |    |    |-- InvoiceHeaderDate: long (nullable = true)
 |    |    |    |-- InvoiceHeaderDescription: string (nullable = true)
 |    |    |-- InvoiceItems: struct (nullable = true)
 |    |    |    |-- ItemId: long (nullable = true)
 |    |    |    |-- ItemPrice: long (nullable = true)
 |    |    |    |-- ItemQuantity: long (nullable = true)
 |    |    |-- InvoiceNumber: string (nullable = true)
 |-- Header: struct (nullable = true)
 |    |-- Description: string (nullable = true)
 |    |-- SequenceNumber: long (nullable = true)
 |    |-- ShipDate: long (nullable = true)
 |-- xmlns:xsi: string (nullable = true)
 |-- xsi:noNamespaceSchemaLocation: string (nullable = true)

Таким образом, похоже, что приведенная выше команда правильно читает файл и, конечно, я могу подключиться к моей хорошо нормализованной базе данных SQL Azure и записатьзаписи в конкретные таблицы:

dfXml.write.jdbc(url=jdbcUrl, table="dest_table", mode="overwrite", properties=connectionProperties)

, однако этот метод требует установки нескольких вложенных циклов и множества ручных задач для отслеживания ключей каждой таблицы и соблюдения ссылочной целостности, которые не используют архитектуру Spark, поэтому яТеперь мне интересно, есть ли лучшая практика (или готовая библиотека), которая делает эту задачу более автоматизированной и масштабируемой.

Я ожидаю, что это общая потребность, поэтому в идеале я бы использовал библиотеку, которая считывает полную структуру XML, показанную в начале, и автоматически извлекает информацию для вставки в нормализованные таблицы.

Большое спасибо за любое предложение.

Мауро

Ответы [ 3 ]

0 голосов
/ 28 марта 2019

Я использую spark-shell для выполнения ниже. Я считаю, что структура xml повторяется. Вам нужно создать / ссылаться на одну схему, связанную с XML-файлом. И вы могли бы использовать Brickhouse UDF JAR. то

1. Создайте функцию, как показано ниже

sql(""" create temporary function numeric_range as brickhouse.udf.collect.NumericRange""")

2.Используйте схему

var df=sqlContext.read.format("com.databricks.spark.xml").option("rowTag","FileSummary").load("location of schema file")

val schema=df.schema

3. var df1=sqlContext.read.format("com.databricks.spark.xml").option("rowTag","FileSummary").schema(schema).load("location of actual xml file")

df1.registerTempTable("XML_Data")

4.Вы должны сгладить FileInvoices, как показано ниже

val df2=sql("select array_index(FileInvoices,n) as FileInvoices from XML_Data lateral view numeric_range(size(FileInvoices))n1 as n""").registerTempTable("xmlData2")

Как только каждый преобразован в Структуру, его легче пройти или использовать взорвать , используя FileInvoices.InvoiceHeader.InvoiceHeaderDate

val jdbcUsername = "<username>"
val jdbcPassword = "<password>"
val jdbcHostname = "<hostname>" //typically, this is in the form or servername.database.windows.net
val jdbcPort = 1433
val jdbcDatabase ="<database>"

val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"

val connectionProperties = new Properties()
connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")

spark.table("").write.jdbc(jdbc_url, "xmlData2", connectionProperties)
0 голосов
/ 30 марта 2019

Спасибо, Субаш, Ананд. Что касается ответа Субаша, у меня нет файла схемы, поэтому я изменил его шаг 2, заменив «расположение фактического XML-файла» на «местоположение фактического XML-файла», и он действительно работает: после шага 3, если я просто запустите

df2=sql("select * from XML_Data")

и тогда я бегу

from pyspark.sql.functions import explode
df3=df2.withColumn("FileInvoices", explode(df2.FileInvoices))
display(df3)

В результате он реплицирует одну и ту же структуру заголовка в несколько строк, где в столбце FileInvoices у меня есть отдельная структура счетов: в разобранном виде FileInvoices

Похоже, я приближаюсь к своей конечной цели, однако я все еще не могу автоматизировать создание записей в правильном порядке, чтобы избежать нарушения ссылочной целостности.

Но прежде чем сделать это, я был бы признателен за ваш отзыв.

Еще раз спасибо,

Mauro

0 голосов
/ 28 марта 2019

В зависимости от того, что вы пытаетесь сделать, и как выглядит структура вашей таблицы.Я предполагаю, что вы пытаетесь обработать много файлов, используя спарк.А также вы хотите загрузить данные в разные нормализованные таблицы

Например, вы можете записать заголовки в одну таблицу, заголовок-> fileInvoices - отношение 1 ко многим, так что это может быть другая таблица.

  • Когда вы читаете несколько XML-файлов, используя load (имя файла * .xml), вы также хотите сделать FileSummary тегом строки.Тогда у вас будет несколько строк в кадре данных, по одной на каждую файловую сумму.

  • Вы можете выбрать столбцы заголовков в другом кадре данных и записать их в таблицу.

  • FileInvoices - это массив структур, который вы можете разбить на несколько строк и сохранить их в другой таблице.

  • Далее, если каждый счет-фактура может содержать несколько элементов, вы можете сделать еще один разрыв, чтобы сделатьих в строки и сохранить их в другой таблице

Или вы можете сделать два разнесения и загрузить результирующий кадр данных в одну большую денормализованную таблицу.

Вот статья о том, каквзорвать работы https://hadoopist.wordpress.com/2016/05/16/how-to-handle-nested-dataarray-of-structures-or-multiple-explodes-in-sparkscala-and-pyspark/

...