Java Spark - Как вызвать UDF с несколькими столбцами в качестве аргумента - PullRequest
0 голосов
/ 02 апреля 2020

Я пытаюсь выполнить следующий код

SparkSession sparkSession = SparkSession
        .builder()
        .appName("test")
        .master("local")
      //.enableHiveSupport()
        .getOrCreate();

StructField[] structFields = new StructField[]{
        new StructField("FIRST", DataTypes.StringType, true, Metadata.empty()),
        new StructField("SECOND", DataTypes.StringType, true, Metadata.empty())
};

StructType structType = new StructType(structFields);

List<Row> rows = new ArrayList<>();
rows.add(RowFactory.create("1","2"));
rows.add(RowFactory.create("2","3"));
Dataset<Row> dataDs = sparkSession.createDataFrame(rows, structType);
UDF1 mode = new UDF1<String,String, String>() {

    public String call(String a,String b) throws Exception {
        return a+b;
    }

};
sparkSession.udf().register("mode",mode,DataTypes.StringType);
dataDs.withColumn("newCol",callUDF("mode",col("FIRST"),col("SECOND")));
dataDs.show();

Однако кажется, что я не могу объявить UDF с несколькими аргументами, как я делал с

UDF1 mode = new UDF1<String,String, String>() {
    public String call(String a,String b) throws Exception {
        return a+b;
    }
 };

Я хочу передать оба моих столбца в качестве ввода в UDF.

Я могу передать один объект в первом параметре, например, Integer или что-то еще, но я хочу передать несколько столбцов. как мне это сделать? Может ли кто-нибудь помочь мне здесь? Я искал inte rnet для этого, но не смог найти решение. Я новичок в искре.

1 Ответ

0 голосов
/ 02 апреля 2020

Я полностью согласен с комментариями @ emest_k и хочу добавить еще одну вещь. Потому что dataFrame является неизменным, поэтому вам нужно использовать новый dataFrame, чтобы увидеть вывод с помощью show (). Вот образец.

{
    ...

    UDF2 mode = new UDF2<String, String, String>() {
        public String call(String a, String b) throws Exception {
            return a + b;
        }
    };
    sparkSession.udf().register("mode", mode, DataTypes.StringType);
    Dataset<Row> newDataFrame = dataDs.withColumn("newCol", callUDF("mode", col("FIRST"), col("SECOND")));
    newDataFrame.show();
}
...