Обновить значение в столбце типа структуры в java искре - PullRequest
2 голосов
/ 05 мая 2020

Мне нужна возможность обновлять значение во вложенном наборе данных. Для этого у меня есть вложенный набор данных в Spark. Он имеет следующую структуру схемы: -

root

 |-- field_a: string (nullable = false)

 |-- field_b: struct (nullable = true)

 |    |-- field_d: struct(nullable = false)
          |-- field_not_to_update: string(nullable = true)

 |        |-- field_to_update: string(nullable = false)
 |   field_c: string (nullable = false)

Теперь я хотел обновить значение в field_to_update в наборе данных. Я пробовал

aFooData.withColumn("field_b.field_d.field_to_update", lit("updated_val")

Также пробовал

aFooData.foreach(new ClassWithForEachFunction());

, где ClassWithForEachFunction implements ForEachFunction<Row> и имеет метод public void call(Row aRow) для обновления атрибута field_to_update. Пробовал то же самое с lamda, но он выбрасывал Task not serializable exception, поэтому должен go для длительного процесса.

Ни один из них пока не приносит результатов, и я получаю тот же набор данных с foreach и новым столбцом с именем field_b.field_d.field_to_update во втором случае. Есть ли другие предложения по поводу того же?

Ответы [ 3 ]

0 голосов
/ 15 мая 2020

Вам нужно будет восстановить всю схему, вы можете сделать это в одном экземпляре с помощью следующего предложения.

import org.apache.spark.sql.functions.{lit, struct}

df.select(
  df("field_a"), // keep the fields that don't change
  struct( // the field at first level must be reconstructed
     lit("updated_value") as "field_to_update", // transform or set the new elements
     df("fb.field_not_to_update") as "field_not_to_update" // keep the unchanged sub elements and keep the last name
  ) as "field_b", // and we have to keep the name
  df("field_c")
)

Синтаксис будет таким же в java

0 голосов
/ 24 мая 2020

Подход, более похожий на «Java», заключался бы в преобразовании фрейма данных в (типизированный) набор данных и последующем использовании вызова map для изменения данных. С точки зрения Java код прост в обращении. Но недостатком является то, что вам понадобятся три Java классов bean-компонентов для данной схемы.

Dataset<Bean1> ds = df.as(Encoders.bean(Bean1.class));

Dataset<Bean1> updatedDs = ds.map((MapFunction<Bean1, Bean1>) row -> {
    row.getField_b().getField_d().setField_to_update("updated");
    return row;
}, Encoders.bean(Bean1.class));

с тремя классами Bean

public static class Bean1 implements Serializable {
    private String field_a;
    private Bean2 field_b;
    private String field_c;

    //getters and setters
}

public static class Bean2 implements Serializable {
    private Bean3 field_d;

    //getter and setter
}

public static class Bean3 implements Serializable {
    private String field_not_to_update;
    private String field_to_update;

    //getters and setters
}
0 голосов
/ 05 мая 2020

Пожалуйста, проверьте код ниже.

  • Извлечь поля из структуры
  • Обновить необходимое поле.
  • Восстановить структуру.
scala> df.show(false)
+-------+--------------+
|field_a|field_b       |
+-------+--------------+
|parentA|[srinivas, 20]|
|parentB|[ravi, 30]    |
+-------+--------------+


scala> df.printSchema
root
 |-- field_a: string (nullable = true)
 |-- field_b: struct (nullable = true)
 |    |-- field_to_update: string (nullable = true)
 |    |-- field_not_to_update: integer (nullable = true)


scala> df.select("field_a","field_b.field_to_update","field_b.field_not_to_update").withColumn("field_to_update",lit("updated_val")).select(col("field_a"),struct(col("field_to_update"),col("field_not_to_update")).as("field_b")).show(false)
+-------+-----------------+
|field_a|field_b          |
+-------+-----------------+
|parentA|[updated_val, 20]|
|parentB|[updated_val, 30]|
+-------+-----------------+

...