Ошибка при использовании объекта Scala в PySpark - PullRequest
0 голосов
/ 08 апреля 2020

Я планирую использовать Scala Объект в Pyspark. Ниже приведен код в Scala

package za.co.absa.cobrix.spark.cobol.utils

import org.apache.spark.sql.{Column, DataFrame}
import scala.annotation.tailrec
import scala.collection.mutable

object SparkUtils {

  def flattenSchema(df: DataFrame, useShortFieldNames: Boolean = false): DataFrame = {
   val fields = new mutable.ListBuffer[Column]()
   val stringFields = new mutable.ListBuffer[String]()
   val usedNames = new mutable.HashSet[String]()
 }
}

Ссылка на Github: https://github.com/AbsaOSS/cobrix/blob/f95efdcd5f802b903404162313f5663bf5731a83/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala

Я только что скопировал несколько строк метода flattenSchema ()

Spark-код в Scala:

import za.co.absa.cobrix.spark.cobol.utils.SparkUtils
val dfFlattened = SparkUtils.flattenSchema(df)

Я попытался вызвать тот же метод flattenSchema () в PySpark после импорта jar-файла в spark-submit

    dfflatten = DataFrame(sparkContext._jvm.za.co.absa.cobrix.spark.cobol.utils.SparkUtils.flattenSchema(df._jdf),sqlContext)

Но получило сообщение об ошибке :

df = sparkCont._jvm.za.co.absa.cobrix.spark.cobol.utils.SparkUtils.flattenSchema(df._jdf)
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p3544.1321029/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__   File
"/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p3544.1321029/lib/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 63, in deco   File
"/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p3544.1321029/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 332, in get_return_value py4j.protocol.Py4JError: An error
 occurred while calling
z:za.co.absa.cobrix.spark.cobol.utils.SparkUtils.flattenSchema. Trace:
py4j.Py4JException: Method flattenSchema([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
    at py4j.Gateway.invoke(Gateway.java:276)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Пожалуйста, помогите!

1 Ответ

2 голосов
/ 11 апреля 2020

Вы забыли объявить объект на Scala, чтобы часть Python могла его найти. Примерно так:

package za.co.absa.cobrix.spark.cobol.utils

import org.apache.spark.sql.{Column, DataFrame}
import scala.annotation.tailrec
import scala.collection.mutable

object SparkUtils {
  def flattenSchema(df: DataFrame, useShortFieldNames: Boolean): DataFrame = {
   val fields = new mutable.ListBuffer[Column]()
   val stringFields = new mutable.ListBuffer[String]()
   val usedNames = new mutable.HashSet[String]()
  }
}

ВАЖНО: Также старайтесь не использовать перегрузку метода (или параметры по умолчанию, которые фактически приводят к перегрузке метода или другим трюкам ниже) ... это будет сложно перевести (и использовать его на стороне Python).

ПРИМЕЧАНИЕ: чтобы преодолеть отсутствие значений по умолчанию, просто явно передайте значение из части Python, и все готово, в этом случае просто дополнительный логический. Кроме того, вы можете создать значение по умолчанию на стороне Python, это безопаснее и полезно (особенно если у вас много вызывающих точек).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...