Я пытаюсь перебрать строки набора данных в Java, а затем получить доступ к определенному столбцу, чтобы найти его значение, хранящееся в качестве ключа в файле JSON, и получить его значение.Найденное значение необходимо сохранить как новое значение столбца в этой строке для всех строк.
Я вижу, что мои cluster_val
, полученные из файла JSON, не равны NULL, но когда я пытаюсь добавить его в качестве столбцаЯ получаю Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.NullPointerException
Пока у меня есть это:
Dataset<Row> df = spark.read().format("csv").load(path);
df.foreach((ForeachFunction<Row>) row ->
{
String df_col_val = (String) row.get(6);
System.out.println(row.get(6));
if(df_col_val.length() > 5){
df_col_val = df_col_val.substring(0, df_col_val.length() - 5 + 1); //NOT NULL
}
System.out.println(df_col_val);
String cluster_val = (String) jo.get(df_col_val); //NOT NULL
System.out.println(cluster_val);
df.withColumn("cluster", df.col(cluster_val)); // NULL POINTER EXCEPTION. WHY?
df.show();
});
Так что в основном мне нужна помощь в чтении набора данных строка за строкой и выполнении последующих операций, как описано выше.Невозможно найти много ссылок в Интернете.Пожалуйста, направьте меня, чтобы исправить источники, если это возможно.Также, если есть краткий способ сделать это, дайте мне знать.
Итак, я понял, что df.col(cluster_val)
выдает исключение, так как столбец отсутствует.Как конвертировать имя строки столбца в тип столбца, необходимый для передачи в withColumn()
функцию pf Dataset
ОБНОВЛЕНИЕ:
ТАК Я попробовал следующее, и вот новый столбец я пытаюсьчтобы получить новое значение col, используя udf, но оно будет нулевым, если использовать его следующим образом:
Dataset<Row> df = spark.read().format("csv").option("header", "true").load(path);
Object obj = new JSONParser().parse(new FileReader("path to json"));
JSONObject jo = (JSONObject) obj;
df.withColumn("cluster", functions.lit((String) jo.get(df.col(df_col_val)))));
df.show();