Java Столбец массива обработки UDF - PullRequest
0 голосов
/ 02 августа 2020

Я пишу java udf для обработки столбца типа массива.

Цель состоит в том, чтобы обработать массив строк, чтобы выбрать строку с наименьшей длиной

sqlContext.udf().register("NAME_SELECTOR", (UDF1<List<String>, String>) brandNames -> {
                          brandNames.sort(Comparator.comparing(String::length));
                          return brandNames.get(0);},DataTypes.StringType);

Ошибка была связана с типом ввода функции UDF. Я знаю, что в scala мне нужно использовать Seq[String] в качестве типа ввода, как насчет Java?

Вот сообщение об ошибке:

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to java.util.List

1 Ответ

0 голосов
/ 02 августа 2020

Попробуйте это -

Используйте scala.collection.mutable.WrappedArray и преобразуйте его в список java, используя JavaConverters, затем отсортируйте его с помощью компаратора и выберите первую самую короткую строку -

Dataset<Row> df = spark.sql("select array('abc', 'ab', 'a') arr");
        df.printSchema();
        df.show(false);
        /**
         * root
         *  |-- arr: array (nullable = false)
         *  |    |-- element: string (containsNull = false)
         *
         * +------------+
         * |arr         |
         * +------------+
         * |[abc, ab, a]|
         * +------------+
         */

        // scala.collection.mutable.WrappedArray
        UserDefinedFunction shortestStringUdf = udf((WrappedArray<String> arr)  -> {
                    List<String> strings = new ArrayList<>(JavaConverters
                            .asJavaCollectionConverter(arr)
                            .asJavaCollection());
                    strings.sort(Comparator.comparing(String::length));
                    return strings.get(0);
                }
                , DataTypes.StringType);
        spark.udf().register("shortestString", shortestStringUdf);

        df.withColumn("a", expr("shortestString(arr)"))
        .show(false);
        /**
         * +------------+---+
         * |arr         |a  |
         * +------------+---+
         * |[abc, ab, a]|a  |
         * +------------+---+
         */

Если вы используете spark>=2.4, используйте функции высшего порядка для достижения того же результата without udf, как показано ниже -

 // spark>=2.4
        df.withColumn("arr_length", expr("TRANSFORM(arr, x -> length(x))"))
                .withColumn("a", expr("array_sort(arrays_zip(arr_length, arr))[0].arr"))
                .show(false);
        /**
         * +------------+----------+---+
         * |arr         |arr_length|a  |
         * +------------+----------+---+
         * |[abc, ab, a]|[3, 2, 1] |a  |
         * +------------+----------+---+
         */
...