Я пытаюсь использовать scala
UDF в pyspark
Мой scala
udf выглядит следующим образом.
package com.ParseGender
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
object ParseGender{
def testudffunction(s: String): String = {
if(List("cis female","f","female","woman","femake","female ",
"cis-female/femme","female (cis)","femail").contains(s.toLowerCase))
"Female"
else if(List("male","m","male-ish","maile","mal","male (cis)",
"make","male ","man","msle","mail","malr","cis man","cis male").contains(s.toLowerCase))
"Male"
else
"Transgender"
}
def getFun(): UserDefinedFunction = udf(testudffunction _)
}
Я использую sbt package
, чтобы упаковать его в jars
. Мой build.sbt
выглядит следующим образом
name := "ParseGender"
version := "1.0"
organization := "testcase"
scalaVersion := "2.11.8"
val sparkVersion = "2.3.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion)
И, наконец, чтобы проверить это, я написал небольшую программу pyspark
from pyspark.sql import functions as F
from pyspark.sql import types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.column import Column, _to_java_column, _to_seq
spark = SparkSession.builder \
.master("local") \
.appName("UDAF") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
def test_udf(col):
sc = spark.sparkContext
_test_udf = sc._jvm.com.test.ParseGender.getFun()
return Column(_test_udf.apply(_to_seq(sc, [col], _to_java_column)))
df = spark.createDataFrame(
[("female",), ("male",), ("femail",)],
("text",)
)
df = df.withColumn('text2', test_udf(df['text']))
df.show(3)
Я запускаю его локально с spark-submit
, вот так
spark-submit --jars /somepath/scala-2.11/parsegender_2.11-1.0.jar xyz.py
где xyz.py
имеет вышеуказанный код pyspark.
После оформления отправки я получаю следующую ошибку
_test_udf = sc._jvm.com.test.ParseGender.getFun()
TypeError: 'JavaPackage' object is not callable
Я подозреваю, что это как-то связано с pyspark, который не может прочитать / обнаружить пакет, но я не уверен. Может ли кто-нибудь предоставить несколько советов о том, как это исправить?