Почему UDF не распознает столбец данных? - PullRequest
1 голос
/ 20 марта 2019

Предположим, у меня есть следующий фрейм данных:

+-----------------+---------------------+
|       document1  |   document2        |
+-----------------+---------------------+
|    word1 word2  |   word2 word3       |
+-----------------+---------------------+

Мне нужно добавить к этому фрейму данных новый столбец, называемый пересечением, который представляет ИНТЕРСЕКЦИЮ сходства между документом 1 и документом 2.

Как мне манипулировать значением в столбце. Я определяю функцию с именем пересечение, принимающую две строки на входе, но я не могу применить ее к типам столбцов. Я думаю, что я должен использовать функции UDF. Как я могу сделать это в Java. Отмечая, что я использую искру 2.3.0.

Я попробовал следующее:

SparkSession spark = SparkSession.builder().appName("spark session example").master("local[*]")
                .config("spark.sql.warehouse.dir", "/file:C:/tempWarehouse")
                .config("spark.sql.caseSensitive", "true")
                .getOrCreate();

sqlContext.udf().register("intersection", new UDF2<String, String, Double>() {
            @Override
            public Double call(String arg, String arg2) throws Exception {
            double key = inter(arg, arg2);
            return key;
            }
            }, DataTypes.DoubleType);
  v.registerTempTable("v_table");

Dataset<Row> df = spark.sql("select v_table.document, v_table.document1, "
                + "intersection(v_table.document, v_table.document1) as RowKey1,"
                + " from v_table");
        df.show();

но я получаю следующее исключение:

    INFO SparkSqlParser: Parsing command: select v_table.document, v_table.document1, intersection(v_table.document, v_table.document1) as RowKey1, from v_table
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`v_table.document`' given input columns: []; line 1 pos 7

Если я удаляю + ", intersection(v.doc1, v.doc2) as RowKey1," из запроса, выборка работает нормально. Любое предложение, пожалуйста? Кроме того, пожалуйста, как я могу использовать тот же подход, используя только на фрейме данных, а не так, как я делаю в SQL?

схема "v" с использованием v.printSchema();:

root
 |-- document: string (nullable = true)
 |-- document1: string (nullable = true)

1 Ответ

2 голосов
/ 20 марта 2019

Я думаю, я бы работал по-другому.

Превратите ваш набор данных в два набора данных работы: один для doc1 и один для doc 2. Сначала разбейте свою строку на массив слов, а затем взорвитесь Тогда все, что вам нужно сделать, это сохранить перекресток ..

Примерно так:

Dataset<Row> ds = spark.sql("select 'word1 word2' as document1, 'word2 word3' as document2");
ds.show();

Dataset<Row> ds1 = ds.select(functions.explode(functions.split(ds.col("document1"), " ")).as("word"));
Dataset<Row> ds2 = ds.select(functions.explode(functions.split(ds.col("document2"), " ")).as("word"));      

Dataset<Row> intersection = ds1.join(ds2, ds1.col("word").equalTo(ds2.col("word"))).select(ds1.col("word").as("Common words"));
intersection.show();

Ouput:

+-----------+-----------+
|  document1|  document2|
+-----------+-----------+
|word1 word2|word2 word3|
+-----------+-----------+
+------------+
|Common words|
+------------+
|       word2|
+------------+

В любом случае , если ваша цель - «вызвать» пользовательский UDF только на два столбца, вот как я бы это сделал:

1. Создайте свой UDF

UDF2<String, String, String> intersection = new UDF2<String, String, String>() {
    @Override
    public String call(String arg, String arg2) throws Exception {
        String key = inter(arg, arg2);
        return key;
    }

    private String inter(String arg1, String arg2) {
        // this part of the implementation is just to stay in line with the previous part of this answer
        List<String> list1 = Arrays.asList(arg1.split(" "));
        return Stream.of(arg2.split(" ")).filter(list1::contains).collect(Collectors.joining(" "));
    }
};

2. Зарегистрируйтесь и используйте это!

чистая ява

UserDefinedFunction intersect = functions.udf(intersection, DataTypes.StringType);      

Dataset<Row> ds1 = ds.select(ds.col("document1"), ds.col("document2"), intersect.apply(ds.col("document1"), ds.col("document2")));
ds1.show();

SQL

spark.sqlContext().udf().register("intersect", intersection, DataTypes.StringType);
Dataset<Row> df = spark.sql("select document1, document2, "
                + "intersect(document1, document2) as RowKey1"
                + " from v_table");
df.show();

выход

+-----------+-----------+-------+
|  document1|  document2|RowKey1|
+-----------+-----------+-------+
|word1 word2|word2 word3|  word2|
+-----------+-----------+-------+

Полный код

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

public class StackOverflowUDF {
    public static void main(String args[]) {
        SparkSession spark = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();

        Dataset<Row> ds = spark.sql("select 'word1 word2' as document1, 'word2 word3' as document2");
        ds.show();

        UDF2<String, String, String> intersection = new UDF2<String, String, String>() {
            @Override
            public String call(String arg, String arg2) throws Exception {
                String key = inter(arg, arg2);
                return key;
            }

            private String inter(String arg1, String arg2) {
                List<String> list1 = Arrays.asList(arg1.split(" "));
                return Stream.of(arg2.split(" ")).filter(list1::contains).collect(Collectors.joining(" "));
            }
        };

        UserDefinedFunction intersect = functions.udf(intersection, DataTypes.StringType);

        Dataset<Row> ds1 = ds.select(ds.col("document1"), ds.col("document2"),
                intersect.apply(ds.col("document1"), ds.col("document2")));
        ds1.show();
        ds1.printSchema();

        ds.createOrReplaceTempView("v_table");

        spark.sqlContext().udf().register("intersect", intersection, DataTypes.StringType);
        Dataset<Row> df = spark
                .sql("select document1, document2, " + "intersect(document1, document2) as RowKey1" + " from v_table");
        df.show();
        spark.stop();

    }
}

НТН!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...