Я определил несколько простых функций, таких как:
def median_func(xs):
List_median=sorted(xs)
if len(List_median)%2==0:
result=(List_median[int(len(List_median)/2) - 1] + List_median[int(len(List_median)/2)])/2
else:
result=List_median[int(len(List_median)/2)]
return result
## --------------------- ##
def max_func(xs):
List_max=sorted(xs)
return List_max[-1]
## --------------------- ##
def min_func(xs):
List_min=sorted(xs)
return List_min[0]
И определил некоторые лямбды как:
import pyspark.sql.functions as sf
median_udf = sf.udf(lambda xs: median_func(xs), DoubleType())
max_udf = sf.udf(lambda xs: max_func(xs), IntegerType())
min_udf = sf.udf(lambda xs: min_func(xs), DoubleType())
В PySpark я использую их как:
data_frame = data_frame.withColumn("Rolling_median_lat", median_udf(column_latitude))\
.withColumn("Rolling_median_lon", median_udf(column_longitude))\
.withColumn("Rolling_max_deltatime", max_udf(column_deltatime))
Когда я запускаю выше с Python 2.7 и PySpark 2.2.0, все работает нормально.Но когда я пробую тот же код с Python 3.6, я вижу следующую проблему:
Py4JError: An error occurred while calling None.org.apache.spark.sql.execution.python.UserDefinedPythonFunction. Trace:
py4j.Py4JException: Constructor org.apache.spark.sql.execution.python.UserDefinedPythonFunction([class java.lang.String, class org.apache.spark.api.python.PythonFunction, class org.apache.spark.sql.types.DoubleType$, class java.lang.Integer, class java.lang.Boolean]) does not exist
at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
at py4j.Gateway.invoke(Gateway.java:235)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Я не уверен, в чем проблема.Я пытался совершить несколько ударов и попыток (не уверен, стоит ли их здесь упоминать), но ничего не работает.Что я делаю не так?
РЕДАКТИРОВАТЬ:
Переменные, которые я определил:
column_latitude
Это дает:
Column<b'array(latitude, Lag_latitude_1, Lead_latitude_1, Lag_latitude_2, Lead_latitude_2, Lag_latitude_3, Lead_latitude_3, Lag_latitude_4, Lead_latitude_4)'>
Так что этипростые строковые массивы.
РЕДАКТИРОВАТЬ : Вот оригинал dataframe
У меня есть:
data_frame.head(2)
Это дает мне:
[Row(id=1234, movementdatetime=datetime.datetime(2017, 9, 4, 13, 57, 16), latitude=38.477, longitude=13.256, deltaTime_sec=3459, Lag_latitude_1=38.4593, Lead_latitude_1=38.4872, Lag_longitude_1=13.4902, Lead_longitude_1=13.1767, Lag_deltatime_1=25531, Lead_deltatime_1=1212, Lag_latitude_2=38.3432, Lead_latitude_2=39.5649, Lag_longitude_2=15.1879, Lead_longitude_2=2.6392, Lag_deltatime_2=3280, Lead_deltatime_2=20623078, Lag_latitude_3=38.331, Lead_latitude_3=39.5649, Lag_longitude_3=15.3842, Lead_longitude_3=2.6392, Lag_deltatime_3=3588, Lead_deltatime_3=14580, Lag_latitude_4=38.324, Lead_latitude_4=39.5649, Lag_longitude_4=15.6001, Lead_longitude_4=2.6391, Lag_deltatime_4=0, Lead_deltatime_4=7199),
Row(id=2345, movementdatetime=datetime.datetime(2017, 9, 4, 14, 17, 28), latitude=38.4872, longitude=13.1767, deltaTime_sec=1212, Lag_latitude_1=38.477, Lead_latitude_1=39.5649, Lag_longitude_1=13.256, Lead_longitude_1=2.6392, Lag_deltatime_1=3459, Lead_deltatime_1=20623078, Lag_latitude_2=38.4593, Lead_latitude_2=39.5649, Lag_longitude_2=13.4902, Lead_longitude_2=2.6392, Lag_deltatime_2=25531, Lead_deltatime_2=14580, Lag_latitude_3=38.3432, Lead_latitude_3=39.5649, Lag_longitude_3=15.1879, Lead_longitude_3=2.6391, Lag_deltatime_3=3280, Lead_deltatime_3=7199, Lag_latitude_4=38.331, Lead_latitude_4=39.5649, Lag_longitude_4=15.3842, Lead_longitude_4=2.6391, Lag_deltatime_4=3588, Lead_deltatime_4=10803)]
В основном у меня есть несколько столбцов (девять), из которых мне нужно вычислить медиану.