Как добавить новый столбец в фрейм данных Spark с использованием Java UDF - PullRequest
1 голос
/ 25 марта 2019

У меня есть Dataset<Row> inputDS, который имеет 4 столбца, а именно Id, List<long> time, List<String> value, aggregateType Я хочу добавить еще один столбец к Dataset value_new, используя функцию карты, эта функция карты принимает столбцы time, value и aggregateType передает это в функцию getAggregate(String aggregateType, List<long> time, List<String> value) и возвращает двойное значение при обработке параметров. Значение Double, возвращаемое методом getAggregate, будет новым значением столбца, т.е. значением value_new

Ввод набора данныхDS

 +------+---+-----------+---------------------------------------------+---------------+
 |    Id| value         |     time                                   |aggregateType  |
 +------+---------------+---------------------------------------------+---------------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |
 +------+---------------+---------------------------------------------+---------------+

Ожидаемый вывод набора данныхDS

 +------+---------------+---------------------------------------------+---------------+-----------+
 |    Id| value         |     time                                    |aggregateType  | value_new |
 +------+---------------+---------------------------------------------+---------------+-----------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |   9.4     |
 +------+---------------+---------------------------------------------+---------------+-----------+

Код, который я пробовал .

 inputDS.withColumn("value_new",functions.lit(inputDS.map(new MapFunction<Row,Double>(){

 public double call(Row row){
 String aggregateType = row.getAS("aggregateType");
 List<long> timeList = row.getList("time");
 List<long> valueList= row.getList("value");  

 return  getAggregate(aggregateType ,timeList,valueList);    

 }}),Encoders.DOUBLE())));

ERROR

 Unsupported literal type class org.apache.spark.sql.Dataset [value:double]

Примечание Извините, если я неправильно использовал map, и, пожалуйста, предложите мне, если есть какое-то решение.

Спасибо.!

1 Ответ

0 голосов
/ 26 марта 2019

Вы получаете ошибку, потому что вы пытаетесь создать литерал функции (lit()), используя результат Dataset.map(), который, как вы можете видеть в документации, представляет собой набор данных. В API для Dataset.withColumn() вы можете видеть, что вам нужен аргумент в виде столбца.

Кажется, вам нужно создать пользовательскую функцию. Взгляните на Как вызвать UDF для Spark DataFrame с использованием JAVA?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...