Получение исключения нулевого указателя при попытке добавить столбец в наборе данных Spark в Java - PullRequest
0 голосов
/ 08 октября 2018

Я пытаюсь перебрать строки набора данных в Java, а затем получить доступ к определенному столбцу, чтобы найти его значение, хранящееся в качестве ключа в файле JSON, и получить его значение.Найденное значение необходимо сохранить как новое значение столбца в этой строке для всех строк.

Я вижу, что мои cluster_val, полученные из файла JSON, не равны NULL, но когда я пытаюсь добавить его в качестве столбцаЯ получаю Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.NullPointerException

Пока у меня есть это:

Dataset<Row> df = spark.read().format("csv").load(path);
        df.foreach((ForeachFunction<Row>) row ->
    {
        String df_col_val = (String) row.get(6);
        System.out.println(row.get(6));
        if(df_col_val.length() > 5){
            df_col_val = df_col_val.substring(0, df_col_val.length() - 5 + 1); //NOT NULL
        }
        System.out.println(df_col_val); 
        String cluster_val = (String) jo.get(df_col_val); //NOT NULL
        System.out.println(cluster_val);
        df.withColumn("cluster", df.col(cluster_val));  // NULL POINTER EXCEPTION. WHY?

        df.show();

    });

Так что в основном мне нужна помощь в чтении набора данных строка за строкой и выполнении последующих операций, как описано выше.Невозможно найти много ссылок в Интернете.Пожалуйста, направьте меня, чтобы исправить источники, если это возможно.Также, если есть краткий способ сделать это, дайте мне знать.

Итак, я понял, что df.col(cluster_val) выдает исключение, так как столбец отсутствует.Как конвертировать имя строки столбца в тип столбца, необходимый для передачи в withColumn() функцию pf Dataset

ОБНОВЛЕНИЕ:

ТАК Я попробовал следующее, и вот новый столбец я пытаюсьчтобы получить новое значение col, используя udf, но оно будет нулевым, если использовать его следующим образом:

Dataset<Row> df = spark.read().format("csv").option("header", "true").load(path);

            Object obj = new JSONParser().parse(new FileReader("path to json"));
            JSONObject jo = (JSONObject) obj;

                df.withColumn("cluster", functions.lit((String) jo.get(df.col(df_col_val)))));
        df.show();

1 Ответ

0 голосов
/ 09 октября 2018

При использовании df.withColumn необходим первый аргумент в качестве имени столбца и второй аргумент в качестве значения для этого столбца.Если вы хотите добавить новый столбец с именем в качестве «cluster» и значением как из некоторого значения json, тогда вы можете использовать функцию «lit» как lit (cluster_val), где cluster_val содержит значение.

Вы должны импортировать «org».apache.spark.sql.functions._ "использовать освещенную функцию.

Надеюсь, это поможет.

...