Как перенести Scala DataFrame в Python и наоборот, не регистрируя представление? - PullRequest
0 голосов
/ 13 сентября 2018

У меня есть библиотека, написанная на Scala, в которой есть некоторые функции для загрузки данных из ряда форматов данных измерений и выполнения некоторых вычислений.Все эти функции работают на Scala-версии DataFrame.

. Теперь я хочу использовать эти библиотеки в коде Python с PySpark.Я написал несколько вспомогательных объектов (поскольку моя библиотека использует множество имплицитов в объектах пакетов), чтобы помочь вызвать вещи из Python, и у меня получилось что-то вроде этого: (spark - это Python SparkSession)

sdf = spark._jvm.com.mycompany.PyApiFooHelper.loadFooMeasuringData(spark._jsparkSession, "hdfs:///some/where")
sCalcResult = spark._jvm.com.mycompany.PyApiBarCalculationHelper.doBarCalculation(sdf)

Когда я хочу sCalcResult в Python, я в настоящее время делаю это, как описано в этом вопросе, , регистрируя временное представление и внося его в:

sCalcResult.createOrReplaceTempView("sCalcResult")
calcResult = spark._wrapped.table("sCalcResult")

Однако я нахожуэто ужасно, потому что это может привести к конфликтам имен при «сокрытии этого» для моих коллег-исследователей данных, потому что функции должны каждый раз создавать временные таблицы.Или я генерирую случайные имена таблиц, но тогда у меня могут быть тонны таблиц, которые больше не нужны через некоторое время.

Итак, есть ли такая функция:

pythonDataFrame = scalaToPythonDataframe(scalaDataFrame)

В Python DF есть поле _jdf для получения Java / Scala DF, так что сойтись во временном представлении нельзя, или это так?

РЕДАКТИРОВАТЬ: В настоящее время я использую Spark 2.3.

1 Ответ

0 голосов
/ 27 марта 2019

Я посмотрел исходный код Spark и нашел решение.

A DataFrame имеет конструктор с двумя аргументами, который принимает ссылку на JVM DF и SQLContext в своем варианте Python.

SQLContext получается из SparkSession через его поле _wrapped или из другого DataFrame через его поле sql_ctx.

Итак, это делается так:

from pyspark.sql import DataFrame

# Use Spark DataSource API instead of explicit method
df = spark.read.format("com.mycompany.formats.foo").load("hdfs:///some/where")
sCalcResult = spark._jvm.com.mycompany.PyApiBarCalculationHelper.doBarCalculation(df._jdf)
barCalcResult = DataFrame(sCalcResult, spark._wrapped)
...