Мы пытаемся написать функцию scala udf и вызвать ее из функции карты в pyspark.Схема даты и времени довольно сложна, столбцы, которые мы хотим передать этой функции, являются массивом StructType.
trip_force_speeds = trip_details.groupby("vehicle_id","driver_id", "StartDtLocal", "EndDtLocal")\
.agg(collect_list(struct(col("event_start_dt_local"),
col("force"),
col("speed"),
col("sec_from_start"),
col("sec_from_end"),
col("StartDtLocal"),
col("EndDtLocal"),
col("verisk_vehicle_id"),
col("trip_duration_sec")))\
.alias("trip_details"))
В нашей функции карты нам нужно выполнить некоторые вычисления.
def calculateVariables(rec: Row):HashMap[String,Float] = {
val trips = rec.getAs[List]("trips")
val base_variables = new HashMap[String, Float]()
val entropy_variables = new HashMap[String, Float]()
val week_day_list = List("monday", "tuesday", "wednesday", "thursday", "friday")
for (trip <- trips)
{
if (trip("start_dt_local") >= trip("StartDtLocal") && trip("start_dt_local") <= trip("EndDtLocal"))
{
base_variables("trip_summary_count") += 1
if (trip("duration_sec").toFloat >= 300 && trip("duration_sec").toFloat <= 1800) {
base_variables ("bounded_trip") += 1
base_variables("bounded_trip_duration") = trip("duration_sec") + base_variables("bounded_trip_duration")
base_variables("total_bin_1") += 30
base_variables("total_bin_2") += 30
base_variables("total_bin_3") += 60
base_variables("total_bin_5") += 60
base_variables("total_bin_6") += 30
base_variables("total_bin_7") += 30
}
if (trip("duration_sec") > 120 && trip("duration_sec") < 21600 )
{
base_variables("trip_count") += 1
}
base_variables("trip_distance") += trip("distance_km")
base_variables("trip_duration") = trip("duration_sec") + base_variables("trip_duration")
base_variables("speed_event_distance") = trip("speed_event_distance_km") + base_variables("speed_event_distance")
base_variables("speed_event_duration") = trip("speed_event_duration_sec") + base_variables("speed_event_duration")
base_variables("speed_event_distance_ratio") = trip("speed_distance_ratio") + base_variables("speed_event_distance_ratio")
base_variables("speed_event_duration_ratio") = trip("speed_duration_ratio") + base_variables("speed_event_duration_ratio")
}
}
return base_variables
}
когда мы пытались скомпилировать код scala, мы получили ошибку
я пытался использовать Row, но получил эту ошибку
"error: виды аргументов типа (Список) не соответствуют ожидаемым типам параметров типа (тип T). Параметры типа List не соответствуют ожидаемым параметрам типа T: тип List имеет один параметр типа, но тип T не имеет ни одного - "
в моем случае поездка представляет собой список строк.это схема
StructType(List(StructField(verisk_vehicle_id,StringType,true),StructField(verisk_driver_id,StringType,false),StructField(StartDtLocal,TimestampType,true),StructField(EndDtLocal,TimestampType,true),StructField(trips,ArrayType(StructType(List(StructField(week_start_dt_local,TimestampType,true),StructField(week_end_dt_local,TimestampType,true),StructField(start_dt_local,TimestampType,true),StructField(end_dt_local,TimestampType,true),StructField(StartDtLocal,TimestampType,true),StructField(EndDtLocal,TimestampType,true),StructField(verisk_vehicle_id,StringType,true),StructField(duration_sec,FloatType,true),StructField(distance_km,FloatType,true),StructField(speed_distance_ratio,FloatType,true),StructField(speed_duration_ratio,FloatType,true),StructField(speed_event_distance_km,FloatType,true),StructField(speed_event_duration_sec,FloatType,true))),true),true),StructField(trip_details,ArrayType(StructType(List(StructField(event_start_dt_local,TimestampType,true),StructField(force,FloatType,true),StructField(speed,FloatType,true),StructField(sec_from_start,FloatType,true),StructField(sec_from_end,FloatType,true),StructField(StartDtLocal,TimestampType,true),StructField(EndDtLocal,TimestampType,true),StructField(verisk_vehicle_id,StringType,true),StructField(trip_duration_sec,FloatType,true))),true),true)))
что-то не так в том, как мы определили сигнатуру функции, которую мы пытались переопределить типом структуры spark, но это не сработало для меня.
Я изPython, и я столкнулся с некоторыми проблемами с производительностью в работе с Python, поэтому я решил написать эту функцию карты в Scala.