У меня есть Dataset<Row> inputDS
, который имеет 4 столбца, а именно Id, List<long> time, List<String> value, aggregateType
Я хочу добавить еще один столбец к Dataset
value_new
, используя функцию карты, эта функция карты принимает столбцы time
, value
и aggregateType
передает это в функцию getAggregate(String aggregateType, List<long> time, List<String> value)
и возвращает двойное значение при обработке параметров. Значение Double
, возвращаемое методом getAggregate
, будет новым значением столбца, т.е. значением value_new
Ввод набора данныхDS
+------+---+-----------+---------------------------------------------+---------------+
| Id| value | time |aggregateType |
+------+---------------+---------------------------------------------+---------------+
|0001 | [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum |
+------+---------------+---------------------------------------------+---------------+
Ожидаемый вывод набора данныхDS
+------+---------------+---------------------------------------------+---------------+-----------+
| Id| value | time |aggregateType | value_new |
+------+---------------+---------------------------------------------+---------------+-----------+
|0001 | [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum | 9.4 |
+------+---------------+---------------------------------------------+---------------+-----------+
Код, который я пробовал .
inputDS.withColumn("value_new",functions.lit(inputDS.map(new MapFunction<Row,Double>(){
public double call(Row row){
String aggregateType = row.getAS("aggregateType");
List<long> timeList = row.getList("time");
List<long> valueList= row.getList("value");
return getAggregate(aggregateType ,timeList,valueList);
}}),Encoders.DOUBLE())));
ERROR
Unsupported literal type class org.apache.spark.sql.Dataset [value:double]
Примечание Извините, если я неправильно использовал map
, и, пожалуйста, предложите мне, если есть какое-то решение.
Спасибо.!