Преобразование Spark Scala в Pyspark - PullRequest
0 голосов
/ 21 апреля 2020

Я пытаюсь преобразовать фрагмент искры Scala в Pyspark, который извлекает данные из XML. Работает нормально в scala. Я запутался и мне нужна ваша помощь в преобразовании этого фрагмента в Pyspark (в основном библиотеки импорта).

import org.apache.spark.sql._
import com.databricks.spark.xml._;
import org.apache.spark.sql.functions._
import java.sql.Timestamp
import java.time.Instant
import java.text.SimpleDateFormat
sql("""CREATE TEMPORARY FUNCTION numeric_range AS 'brickhouse.udf.collect.NumericRange'""")
sql("""CREATE TEMPORARY FUNCTION array_index AS 'brickhouse.udf.collect.ArrayIndexUDF'""")



val sqlContext = new org.apache.spark.sql.SQLContext(sc) ;


var df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag","Region").load("<path of xml>")
val schema = df.schema
var parse_df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag","Region").schema(schema).load("<path of xml>")

parse_df.registerTempTable("complete_df")

sql(s"""select current_timestamp() as load_dttm,
 actioncode,cast(regexp_replace(substr(creationTime,1,19),"T"," ") as timestamp) as start_dttm,n as array_index,array_index(Area,n) as Area from complete_df lateral view numeric_range(size(Area)) n1 as n""").registerTempTable("main_table_1")

Мои усилия:

pyspark --packages com.databricks:spark-xml_2.10:0.4.1,com.databricks:spark-csv_2.10:1.5.0,com.klout:brickhouse:0.6.0

from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

sql("CREATE TEMPORARY FUNCTION numeric_range AS 'brickhouse.udf.collect.NumericRange'")
sql("CREATE TEMPORARY FUNCTION array_index AS 'brickhouse.udf.collect.ArrayIndexUDF'")

df=sqlContext.read.format("xml").option("rowTag","Region").load("/tmp/test/Store_Site_edited.xml")
schema = df.schema
parse_df = sqlContext.read.format("xml").option("rowTag","Region").schema(schema).load("/tmp/test/Store_Site_edited.xml")
parse_df.registerTempTable("complete_df")

Я застрял в следующем утверждении, пока работает

sqlContext.sql(s"""select current_timestamp() as load_dttm,actioncode,n as array_index,array_index(Area,n) as Location from complete_df lateral view numeric_range(size(Area)) n1 as n""").registerTempTable("main_table_1")

также пытался

sql(s"select current_timestamp() as load_dttm,actioncode,n as array_index,array_index(Area,n) as Location from complete_df lateral view numeric_range(size(Area)) n1 as n").registerTempTable("main_table_1")

выдает ошибку

enter image description here

...