Вызов Scala UDF в Pyspark JavaPackage не вызывает ошибку - PullRequest
3 голосов
/ 11 июля 2019

Я пытаюсь использовать scala UDF в pyspark Мой scala udf выглядит следующим образом.

package com.ParseGender
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._


object ParseGender{
      def testudffunction(s: String): String =  {
          if(List("cis female","f","female","woman","femake","female ",
                  "cis-female/femme","female (cis)","femail").contains(s.toLowerCase))
          "Female"
          else if(List("male","m","male-ish","maile","mal","male (cis)",
                         "make","male ","man","msle","mail","malr","cis man","cis male").contains(s.toLowerCase))
                "Male"
            else
                "Transgender"
        }
    def getFun(): UserDefinedFunction = udf(testudffunction _)
    }

Я использую sbt package, чтобы упаковать его в jars. Мой build.sbt выглядит следующим образом

name := "ParseGender"
    version := "1.0"
    organization := "testcase"
    scalaVersion := "2.11.8"
    val sparkVersion = "2.3.0"

    libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion,
    "org.apache.spark" %% "spark-sql" % sparkVersion)

И, наконец, чтобы проверить это, я написал небольшую программу pyspark

from pyspark.sql import functions as F
from pyspark.sql import types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.column import Column, _to_java_column, _to_seq

spark = SparkSession.builder \
     .master("local") \
     .appName("UDAF") \
     .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def test_udf(col):
  sc = spark.sparkContext
  _test_udf = sc._jvm.com.test.ParseGender.getFun()
  return Column(_test_udf.apply(_to_seq(sc, [col], _to_java_column)))

df = spark.createDataFrame(
  [("female",), ("male",), ("femail",)],
  ("text",)
)

df = df.withColumn('text2', test_udf(df['text']))
df.show(3)

Я запускаю его локально с spark-submit, вот так

spark-submit --jars /somepath/scala-2.11/parsegender_2.11-1.0.jar xyz.py

где xyz.py имеет вышеуказанный код pyspark.

После оформления отправки я получаю следующую ошибку

    _test_udf = sc._jvm.com.test.ParseGender.getFun()
    TypeError: 'JavaPackage' object is not callable

Я подозреваю, что это как-то связано с pyspark, который не может прочитать / обнаружить пакет, но я не уверен. Может ли кто-нибудь предоставить несколько советов о том, как это исправить?

...