Передача нескольких фреймов данных между Scala Spark и PySpark - PullRequest
0 голосов
/ 16 мая 2019

Я пытаюсь передать и преобразовать несколько фреймов данных из Scala в Pyspark. когда я пробую его для одного фрейма данных, я могу прочитать выходные данные, используя .show (), но для нескольких фреймов данных я не могу разделить фреймы данных и, следовательно, не могу преобразовать

Я попытался напечатать один кадр данных, перейдя от scala spark к pyspark. это работает отлично, я даже смог преобразовать его в панд. когда я сделал то же самое, передав два кадра данных, я не смог разделить их. я получаю данные в виде типа «py4j.java_gateway.JavaMember», в этом объекте есть два кадра данных, и если я смогу их разделить, я смогу преобразовать кадр данных pyspark, как сделал это с одним кадром данных scala

вот код скалы

import org.apache.spark.SparkContext
import org.apache.spark.sql.{SparkSession, DataFrame}

object scala_py{
def getInputDF:(DataFrame,DataFrame)={

val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
val SourceTableDataframe = spark.sql("select * from <table name>")
val data=spark.sql("SELECT * FROM <table name>")
return (SourceTableDataframe,data)
}}

это код pyspark:

from pyspark import StorageLevel, SparkFiles
from pyspark.sql import SparkSession, DataFrame, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf

spark = SparkSession \
    .builder \
    .appName("PySpark using Scala example") \
    .getOrCreate()
sqlContext = spark._wrapped
sc = spark._sc
scala_out=sc._jvm.com.accenture.asset.scala_py

df1,df2=scala_out.getInputDF()
df=DataFrame(df1,sqlContext)

Я ожидаю, что оба моих фрейма данных scala (SourceTableDataframe, data) будут передаваться в фреймы данных pyspark как df1 и df2.

теперь я получаю объект py4j.java_gateway.JavaMember, который не смог обработать.

когда я возвращаю один фрейм данных, я получаю тот же тип данных, но я могу преобразовать его в фрейм данных pyspark, когда я пытаюсь вернуть два, я не могу получить то, что мне нужно

1 Ответ

1 голос
/ 16 мая 2019

Возвращаемый вами объект - scala.Tuple2, эквивалентный:

dfs = sc._jvm.scala.Tuple2(spark._jsparkSession.range(1), spark._jsparkSession.range(2))

который не имеет сопоставления py4j. Поэтому вы должны обработать его вручную

df1, df2 = DataFrame(dfs._1(), sqlContext), DataFrame(dfs._2(), sqlContext)
...