У меня проблема с API-интерфейсом WINDOW FUNCTION:
мой вопрос похож на этот: Как удалить дубликаты, используя условия
У меня есть набор данных:
+---+----------+---------+
| ID| VALUEE| OTHER|
+---+----------+---------+
| 1| null|something|
| 1|[1.0, 0.0]|something|
| 1|[1.0, 0.0]|something|
| 1|[0.0, 2.0]|something|
| 1|[3.0, 5.0]|something|
| 2|[3.0, 5.0]|something|
| 1|[3.0, 5.0]|something|
| 2| null|something|
| 3|[3.0, 5.0]|something|
| 4| null|something|
+---+----------+---------+
Я хочу сохранить только один идентификатор каждого (без дубликатов), и мне не важно значение, но я предпочитаю ненулевое значение
ожидаемый результат
+---+----------+---------+
| ID| VALUEE| OTHER|
+---+----------+---------+
| 1|[0.0, 2.0]|something|
| 3|[3.0, 5.0]|something|
| 4| null|something|
| 2|[3.0, 5.0]|something|
+---+----------+---------+
windowsFunction с функцией Aggregate first () не работают, тогда как с row_number () она работает
, но я не понимаю, почему сначала не работают
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.spark_project.guava.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.apache.spark.sql.types.DataTypes.createStructField;
public class TestSOF {
public static void main(String[] args) {
StructType schema = new StructType(
new StructField[]{
createStructField("ID", IntegerType, false),
createStructField("VALUEE", DataTypes.createArrayType(DataTypes.DoubleType), true),
createStructField("OTHER", StringType, true),
});
double [] a =new double[]{1.0,0.0};
double [] b =new double[]{3.0,5.0};
double [] c =new double[]{0.0,2.0};
List<Row> listOfdata = new ArrayList();
listOfdata.add(RowFactory.create(1,null,"something"));
listOfdata.add(RowFactory.create(1,a,"something"));
listOfdata.add(RowFactory.create(1,a,"something"));
listOfdata.add(RowFactory.create(1,c,"something"));
listOfdata.add(RowFactory.create(1,b,"something"));
listOfdata.add(RowFactory.create(2,b,"something"));
listOfdata.add(RowFactory.create(1,b,"something"));
listOfdata.add(RowFactory.create(2,null,"something"));
listOfdata.add(RowFactory.create(3,b,"something"));
listOfdata.add(RowFactory.create(4,null,"something"));
List<Row> rowList = ImmutableList.copyOf(listOfdata);
SparkSession sparkSession = new SparkSession.Builder().config("spark.master", "local[*]").getOrCreate();
sparkSession.sparkContext().setLogLevel("ERROR");
Dataset<Row> dataset = sparkSession.createDataFrame(rowList,schema);
dataset.show();
WindowSpec windowSpec = Window.partitionBy(dataset.col("ID")).orderBy(dataset.col("VALUEE").asc_nulls_last());
// wind solution
// lost information
Dataset<Row> dataset0 = dataset.groupBy("ID").agg(functions.first(dataset.col("VALUEE"), true));
Dataset<Row> dataset1 = dataset.withColumn("new",functions.row_number().over(windowSpec)).where("new = 1").drop("new");
//do not work
Dataset<Row> dataset2 = dataset.withColumn("new",functions.first("VALUEE",true).over(windowSpec)).drop("new");
JavaRDD<Row> rdd =
dataset.toJavaRDD()
.groupBy(row -> row.getAs("ID"))
.map(g -> {
Iterator<Row> iter =g._2.iterator();
Row rst = null;
Row tmp;
while(iter.hasNext()){
tmp = iter.next();
if (tmp.getAs("VALUEE") != null) {
rst=tmp;
break;
}
if(rst==null){
rst=tmp;
}
}
return rst;
});
Dataset<Row> dataset3 = sparkSession.createDataFrame(rdd, schema);
dataset0.show();
dataset1.show();
dataset2.show();
dataset3.show();
}
}