Получение NullPointerException при сохранении набора данных Spark Java в местоположение - PullRequest
0 голосов
/ 10 октября 2018

Я читаю файл в наборы данных spark в Java, а затем добавляю новый столбец в набор данных, затем делаю разбиение на основе этого столбца и сохраняю результаты локально.

Я использую внешнюю библиотеку с открытым исходным кодом для добавления слоя кэширования с именем iceberg

//1. READ FILE
    Dataset<Row> df = spark.read().format("csv").option("header", "true").load("data.csv.gz");

// 2. READ JSON
            Object obj = new JSONParser().parse(new FileReader("jsonpath"));
            JSONObject jo = (JSONObject) obj;

            spark.udf().register("getJsonVal", new UDF1<String, String>() {
                @Override
                public String call(String key) {
                    return  (String) jo.get(key.substring(0, 5));
                }
            }, DataTypes.StringType);

    // 3. CREATE AND ADD NEW COLUMN TO EXISTING DATASET
            df = df.withColumn("cluster", functions.callUDF("getJsonVal", df.col("existing_col1")));
            //df.show();  

            df.createOrReplaceTempView("table2");
            String sql = "select count(distinct cluster) from table2";
            spark.sql(sql).show();

// 4. WRITE NEW PARTITIONED DATA TO A LOCAL LOCATION. THE SAVE IS GIVING ERROR BUT IF I WRITE USING EXISTING COLUMN, IT WORKS  
            df.withColumn("existing_col1", df.col("existing_col1")).sort("cluster").write().format("iceberg").mode("append").save(location);

Вот ошибка:

Exception in thread "main" java.lang.NullPointerException
    at com.netflix.iceberg.types.ReassignIds.field(ReassignIds.java:74)
    at com.netflix.iceberg.types.ReassignIds.field(ReassignIds.java:25)
    at com.netflix.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:308)
    at com.google.common.collect.Iterators$8.next(Iterators.java:812)
    at com.google.common.collect.Lists.newArrayList(Lists.java:139)
    at com.google.common.collect.Lists.newArrayList(Lists.java:119)
    at com.netflix.iceberg.types.ReassignIds.struct(ReassignIds.java:52)
    at com.netflix.iceberg.types.ReassignIds.struct(ReassignIds.java:25)
    at com.netflix.iceberg.types.TypeUtil.visit(TypeUtil.java:341)
    at com.netflix.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:293)
    at com.netflix.iceberg.types.ReassignIds.schema(ReassignIds.java:37)
    at com.netflix.iceberg.types.ReassignIds.schema(ReassignIds.java:25)
    at com.netflix.iceberg.types.TypeUtil.visit(TypeUtil.java:313)
    at com.netflix.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:122)
    at com.netflix.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163)
    at com.netflix.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:66)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:254)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at com.mkyong.hashing.App.main(App.java:105)
...