Я думаю, я бы работал по-другому.
Превратите ваш набор данных в два набора данных работы: один для doc1 и один для doc 2.
Сначала разбейте свою строку на массив слов, а затем взорвитесь
Тогда все, что вам нужно сделать, это сохранить перекресток ..
Примерно так:
Dataset<Row> ds = spark.sql("select 'word1 word2' as document1, 'word2 word3' as document2");
ds.show();
Dataset<Row> ds1 = ds.select(functions.explode(functions.split(ds.col("document1"), " ")).as("word"));
Dataset<Row> ds2 = ds.select(functions.explode(functions.split(ds.col("document2"), " ")).as("word"));
Dataset<Row> intersection = ds1.join(ds2, ds1.col("word").equalTo(ds2.col("word"))).select(ds1.col("word").as("Common words"));
intersection.show();
Ouput:
+-----------+-----------+
| document1| document2|
+-----------+-----------+
|word1 word2|word2 word3|
+-----------+-----------+
+------------+
|Common words|
+------------+
| word2|
+------------+
В любом случае ,
если ваша цель - «вызвать» пользовательский UDF только на два столбца, вот как я бы это сделал:
1. Создайте свой UDF
UDF2<String, String, String> intersection = new UDF2<String, String, String>() {
@Override
public String call(String arg, String arg2) throws Exception {
String key = inter(arg, arg2);
return key;
}
private String inter(String arg1, String arg2) {
// this part of the implementation is just to stay in line with the previous part of this answer
List<String> list1 = Arrays.asList(arg1.split(" "));
return Stream.of(arg2.split(" ")).filter(list1::contains).collect(Collectors.joining(" "));
}
};
2. Зарегистрируйтесь и используйте это!
чистая ява
UserDefinedFunction intersect = functions.udf(intersection, DataTypes.StringType);
Dataset<Row> ds1 = ds.select(ds.col("document1"), ds.col("document2"), intersect.apply(ds.col("document1"), ds.col("document2")));
ds1.show();
SQL
spark.sqlContext().udf().register("intersect", intersection, DataTypes.StringType);
Dataset<Row> df = spark.sql("select document1, document2, "
+ "intersect(document1, document2) as RowKey1"
+ " from v_table");
df.show();
выход
+-----------+-----------+-------+
| document1| document2|RowKey1|
+-----------+-----------+-------+
|word1 word2|word2 word3| word2|
+-----------+-----------+-------+
Полный код
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
public class StackOverflowUDF {
public static void main(String args[]) {
SparkSession spark = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
Dataset<Row> ds = spark.sql("select 'word1 word2' as document1, 'word2 word3' as document2");
ds.show();
UDF2<String, String, String> intersection = new UDF2<String, String, String>() {
@Override
public String call(String arg, String arg2) throws Exception {
String key = inter(arg, arg2);
return key;
}
private String inter(String arg1, String arg2) {
List<String> list1 = Arrays.asList(arg1.split(" "));
return Stream.of(arg2.split(" ")).filter(list1::contains).collect(Collectors.joining(" "));
}
};
UserDefinedFunction intersect = functions.udf(intersection, DataTypes.StringType);
Dataset<Row> ds1 = ds.select(ds.col("document1"), ds.col("document2"),
intersect.apply(ds.col("document1"), ds.col("document2")));
ds1.show();
ds1.printSchema();
ds.createOrReplaceTempView("v_table");
spark.sqlContext().udf().register("intersect", intersection, DataTypes.StringType);
Dataset<Row> df = spark
.sql("select document1, document2, " + "intersect(document1, document2) as RowKey1" + " from v_table");
df.show();
spark.stop();
}
}
НТН!