Как сгруппировать различные диапазоны в новые категории столбцов Спарк SQL - PullRequest
0 голосов
/ 29 ноября 2018

Я использую spark sql 2.3.1 и извлекаю данные из схемы json следующим образом:

    auswertezeit|geschwindigkeit|strecke_id|      verkehrsstatus|         coordinates|
+-------------------+---------------+----------+--------------------+--------------------+
|2018-11-21T08:05:00|             13|         3|          Staugefahr|[[[7.0847794888, ...|
|2018-11-21T08:05:00|             53|         4|normales Verkehrs...|[[[7.0946672837, ...|
|2018-11-21T08:05:00|             44|         9|erh�hte Verkehrsb...|[[[7.0776131992, ...|
|2018-11-21T08:05:00|             48|        10|erh�hte Verkehrsb...|[[[7.0715549582, ...|
|2018-11-21T08:05:00|             19|        11|          Staugefahr|[[[7.0716958878, ...|
|2018-11-21T08:05:00|             43|        12|erh�hte Verkehrsb...|[[[7.0888459136, ...|
|2018-11-21T08:05:00|             19|        14|erh�hte Verkehrsb...|[[[7.0867958746, ...|
|2018-11-21T08:05:00|             25|        15|normales Verkehrs...|[[[7.0978591271, ...|
|2018-11-21T08:05:00|             24|        16|normales Verkehrs...|[[[7.0778572339, ...|
|2018-11-21T08:05:00|             25|        17|normales Verkehrs...|[[[7.0864326228, ...|
|2018-11-21T08:05:00|              9|        18|          Staugefahr|[[[7.0977177712, ...|
|2018-11-21T08:05:00|             15|        19|normales Verkehrs...|[[[7.1044319861, ...|
|2018-11-21T08:05:00|              7|        20|          Staugefahr|[[[7.0948248864, ...|
|2018-11-21T08:05:00|             43|        21|normales Verkehrs...|[[[7.1043943657, ...|
|2018-11-21T08:05:00|             37|        22|normales Verkehrs...|[[[7.1174474912, ...|
|2018-11-21T08:05:00|             33|        23|normales Verkehrs...|[[[7.1044889709, ...|
|2018-11-21T08:05:00|             32|        25|normales Verkehrs...|[[[7.0889611262, ...|
|2018-11-21T08:05:00|             33|        26|normales Verkehrs...|[[[7.0949180932, ...|
|2018-11-21T08:05:00|             31|        29|normales Verkehrs...|[[[7.1173394533, ...|
|2018-11-21T08:05:00|             46|        30|normales Verkehrs...|[[[7.1250258018, ...|
+-------------------+---------------+----------+--------------------+--------------------+

Я хочу сгруппировать различные диапазоны "verkehrstatus" в категории, используя UDF.

  def udfState () = udf[java.lang.String, java.lang.String] { a =>
    val x = a match {
      case "Staugefahr"                 => "Stau";
      case "erh�hte Verkehrsbelastung"  => "Stau";
      case "normales Verkehrsaufkommen" => "kein Stau";
    }; x;
  }

 val clean_data = resultDf.na.replace("verkehrsstatus", Map("aktuell nicht ermittelbar" -> "normales Verkehrsaufkommen"))
val datawithudf = clean_data.withColumn("state", udfState()($"verkehrsstatus"))

val finaldata = datawithudf.select($"auswertezeit" , $"geschwindigkeit", $"strecke_id", $"state", $"coordinates") .withColumnRenamed("state", "verkehrsstatus")

Но у меня есть это сообщение об ошибке:

 java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...