Невозможно понять UDF в Spark и особенно в Java - PullRequest
0 голосов
/ 09 октября 2018

Я пытаюсь создать новый столбец в наборах данных Spark на основе значения другого столбца.Значение другого столбца ищется в файле json как ключ, и возвращается его значение, которое является значением, которое будет использоваться для нового столбца.

Вот код, который я пробовал, но он не работает, и яЯ не уверен, как работает UDF.Как добавить столбец в этом случае, используя withColumn или udf?

Dataset<Row> df = spark.read().format("csv").option("header", "true").load("file path");
        Object obj = new JSONParser().parse(new FileReader("json path"));
        JSONObject jo = (JSONObject) obj;

        df = df.withColumn("cluster", functions.lit(jo.get(df.col("existing col_name")))));

Любая помощь будет оценена.Заранее спасибо!

Ответы [ 2 ]

0 голосов
/ 10 октября 2018

Спасибо @Константин.Я смог лучше понять UDF из вашего примера.Вот мой код Java:

        Object obj = new JSONParser().parse(new FileReader("json path"));
        JSONObject jo = (JSONObject) obj;

        spark.udf().register("getJsonVal", new UDF1<String, String>() {
            @Override
            public String call(String key) {
                return  (String) jo.get(key.substring(0, 5));
            }
        }, DataTypes.StringType);

        df = df.withColumn("cluster", functions.callUDF("getJsonVal", df.col("existing col_name")));
        df.show(); // SHOWS NEW CLUSTER COLUMN
0 голосов
/ 09 октября 2018

Spark позволяет создавать пользовательские пользовательские функции (UDF) с помощью функции udf .

Ниже приведен фрагмент кода определения UDF.

val obj = new JSONParser().parse(new FileReader("json path"));
val jo = obj.asInstanceOf[JSONObject];

def getJSONObject(key: String) = {
   jo.get(key)
}

Как только вы определили свою функцию, вы можете преобразовать ее в UDF следующим образом:

 val getObject = udf(getJSONObject _)

Существует два подхода к использованию UDF.

  1. df.withColumn("cluster", lit(getObject(col("existing_col_name"))))

  2. Если вы используете spark sql, вы должны зарегистрировать свой udf в sqlContext, прежде чем использовать его.

    spark.sqlContext.udf.register("get_object", getJSONObject _)

    И тогда выможно использовать как

    spark.sql("select get_object(existing_column) from some_table")

Из них, использование которых является абсолютно субъективным.

...