Как преобразовать столбцы набора данных <Row>в непримитивный тип данных - PullRequest
2 голосов
/ 26 марта 2019

У меня есть Dataset<Row>, в котором есть четыре столбца из четырех двух столбцов. Непримитивные типы данных List<Long> and List<String>.

  +------+---------------+---------------------------------------------+---------------+
  |    Id| value         |     time                                      |aggregateType  |
  +------+---------------+---------------------------------------------+---------------+
  |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum             |
  +------+---------------+---------------------------------------------+---------------+

У меня есть UDF3, который принимает три аргумента и возвращаетDouble значение UDF3<String,List<Long>,List<String>,Double>.

Поэтому, когда я вызываю UDF, он выдает исключение, говорящее

Ошибка

caused by java.lang.classcastexception scala.collection.mutable.wrappedarray$ofref cannot be cast to java.lang.List

Но если яизменил типы на String как UDF3<String,String,String,Double> не жаловаться.

Код, который выдает выше Исключение

 UDF3<String,List<Long>,List<String>,Double> getAggregate = new UDF3<String,List<Long>,List<String>,Double>() {

 public Double call(String t1,List<Long> t2,List<String> t3) throws Exception {

 //do some process to return double

  return double;
  }

  sparkSession.udf().register("getAggregate_UDF",getAggregate, DataTypes.DoubleType);

  inputDS = inputDs.withColumn("value_new",callUDF("getAggregate_UDF",col("aggregateType"),col("time"),col("value")));

Код после изменения всех типовв строку

 UDF3<String,String,String,Double> getAggregate = new UDF3<String,String,String,Double>() {

 public Double call(String t1,String t2,String t3) throws Exception {

 //code to convert t2 and t3 to List<Long> and List<String> respectively

 //do some process to return double

  return double;
  }

  sparkSession.udf().register("getAggregate_UDF",getAggregate, DataTypes.DoubleType);

  inputDS = inputDs.withColumn("value_new",callUDF("getAggregate_UDF",col("aggregateType"),col("time").cast("String"),col("value").cast("String")));

Приведенный выше код работает, но с String to List преобразованием вручную.

Требуется помощь

I) Какбросьте Непримитивные типы данных List<Long> and List<String> в наборе данных, чтобы преодолеть caused by java.lang.classcastexception scala.collection.mutable.wrappedarray$ofref cannot be cast to java.lang.List

II), пожалуйста, предложите мне, если есть какое-то решение

Спасибо.

Ответы [ 2 ]

3 голосов
/ 26 марта 2019

Ваш UDF всегда будет получать экземпляры WrappedArray вместо List, потому что именно так они хранятся в движке.

Вам нужно написать что-то вроде этого:

import scala.collection.mutable.WrappedArray;
import scala.collection.JavaConversions;

UDF3<String, WrappedArray<Long>, WrappedArray<String>, Double> myUDF = new UDF3<String, WrappedArray<Long>, WrappedArray<String>, Double> () {
      public Double call(String param1, WrappedArray<Long> param2, WrappedArray<String> param3) throws Exception {
        List<Long> param1AsList = JavaConversions.seqAsJavaList(param1);
        List<String> param2AsList = JavaConversions.seqAsJavaList(param2);

        ... do work ...

        return myDoubleResult;
    }
};
1 голос
/ 26 марта 2019

Вот мой пример, вы должны использовать WrappedArray для получения массива и преобразования в список

 /*
     +------+---------------+---------------------------------------------+---------------+
     |    Id| value         |     time                                      |aggregateType  |
     +------+---------------+---------------------------------------------+---------------+
     |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum             |
     +------+---------------+---------------------------------------------+---------------+
     **/

    StructType dataSchema = new StructType(new StructField[] {createStructField("Id", DataTypes.StringType, true),
                                                              createStructField("value",
                                                                                DataTypes.createArrayType(DataTypes.DoubleType,
                                                                                                          false),
                                                                                false),

                                                              createStructField("time",
                                                                                DataTypes.createArrayType(DataTypes.LongType,
                                                                                                          false),
                                                                                false),
                                                              createStructField("aggregateType",
                                                                                DataTypes.StringType,
                                                                                true),});

    List<Row> data = new ArrayList<>();

    data.add(RowFactory.create("0001",
                               Arrays.asList(1.5, 3.4, 4.5),
                               Arrays.asList(1551502200000L, 1551502200000L, 1551502200000L),
                               "sum"));
    Dataset<Row> example = spark.createDataFrame(data, dataSchema);
    example.show(false);

    UDF3<String, WrappedArray<Long>, WrappedArray<Double>, Double> myUDF = (param1, param2, param3) -> {

        List<Long> param1AsList = JavaConversions.seqAsJavaList(param2);
        List<Double> param2AsList = JavaConversions.seqAsJavaList(param3);

        //Example
        double myDoubleResult = 0;
        if ("sum".equals(param1)) {

            myDoubleResult = param2AsList.stream()
                                         .mapToDouble(f -> f)
                                         .sum();
        }

        return myDoubleResult;
    };

    spark.udf()
         .register("myUDF", myUDF, DataTypes.DoubleType);

    example = example.withColumn("new", callUDF("myUDF", col("aggregateType"), col("time"), col("value")));
    example.show(false);

Вы можете получить его из github

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...