как сопоставить каждую строку scala кадра данных с новой схемой - PullRequest
0 голосов
/ 25 января 2020

Я обрабатываю потоковые события разных типов и разных схем в spark с scala, и мне нужно проанализировать их и сохранить их в формате, который легко обрабатывать обычным способом c.

У меня есть датафрейм событий, который выглядит следующим образом:

val df = Seq(("{\"a\": 1, \"b\": 2, \"c\": 3 }", "One", "001") ,("{\"a\": 6, \"b\": 2, \"d\": 2, \"f\": 8 }", "Two", "089"), ("{\"a\": 3, \"b\": 4, \"c\": 6 }", "One", "123")).toDF("col1", "col2", "col3")

, это:

+------------------------------------+--------+------+
|   body                             |   type |   id |
+------------------------------------+--------+------+
|{"a": 1, "b": 2, "c": 3 }           |   "One"|   001|
|{"a": 6, "d": 2, "f": 8, "g": 10}   |   "Two"|   089|
|{"a": 3, "b": 4, "c": 6 }           | "Three"|   123|
+------------------------------------+--------+------+

, и я хотел бы превратить его в этот. Мы можем предположить, что все типы «Один» будут иметь одну и ту же схему, и все типы событий будут иметь одни и те же данные, такие как запись «а», которую я хотел бы отобразить в своем собственном столбце

+---+--------------------------------+--------+------+
| a |  data                          |   y    |   z  |
+---+--------------------------------+--------+------+
| 1 |{"b": 2, "c": 3 }               |   "One"|   001|
| 6 |{"d": 2, "f": 8, "g": 10}       |   "Two"|   089|
| 3 |{"b": 4, "c": 6 }               | "Three"|   123|
+------------------------------------+--------+------+

Ответы [ 2 ]

2 голосов
/ 25 января 2020

Одним из способов достижения этого является обработка данных json в виде карты, как показано ниже:

import org.apache.spark.sql.types.{MapType, StringType, IntegerType}
import org.apache.spark.sql.functions.{from_json, expr}

val df = Seq(
  ("{\"a\": 1, \"b\": 2, \"c\": 3 }", "One", "001") ,
  ("{\"a\": 6, \"b\": 2, \"d\": 2, \"f\": 8 }", "Two", "089"), 
  ("{\"a\": 3, \"b\": 4, \"c\": 6 }", "One", "123")
).toDF("body", "type", "id")

val mapSchema = MapType(StringType, IntegerType)

df.withColumn("map", from_json($"body", mapSchema))
  .withColumn("data_keys", expr("filter(map_keys(map), k -> k != 'a')"))
  .withColumn("data_values", expr("transform(data_keys, k -> element_at(map,k))"))
  .withColumn("data", expr("to_json(map_from_arrays(data_keys, data_values))"))
  .withColumn("a", $"map".getItem("a"))
  .select($"a", $"data", $"type".as("y"), $"id".as("z"))
  .show(false)

// +---+-------------------+---+---+
// |a  |data               |y  |z  |
// +---+-------------------+---+---+
// |1  |{"b":2,"c":3}      |One|001|
// |6  |{"b":2,"d":2,"f":8}|Two|089|
// |3  |{"b":4,"c":6}      |One|123|
// +---+-------------------+---+---+

Анализ

  1. withColumn("map", from_json($"body", mapSchema)): сначала создайте карту из данных json.
  2. withColumn("data_keys", expr("filter(map_keys(map), k -> k != 'a')")): получить ключи новой карты, отфильтровав ключи, не равные a. Мы используем здесь функцию filter , которая возвращает массив, например {"a": 1, "b": 2, "c": 3 } -> [b, c].
  3. withColumn("data_values", expr("transform(data_keys, k -> element_at(map,k))")): заполняет значения новой карты, используя предыдущие ключи в сочетании с transform .
  4. withColumn("data", expr("to_json(map_from_arrays(data_keys, data_values))")): сгенерировать карту из data_keys и data_values с использованием map_from_arrays . Наконец, позвоните to_json для преобразования карты обратно в json.
0 голосов
/ 25 января 2020

Сначала вам нужно определить схему json следующим образом:

val schema = spark.read.json(df.select("col1").as[String]).schema

Затем вы можете преобразовать свой столбец col1 в json (1-я строка), а затем просто выбрать, какие варианты из json вы хотите извлечь (2-я строка):

df.select(from_json($"col1", schema).as("data"), $"col2", $"col3")
.select($"data.a", $"data", $"col2", $"col3")

Вывод:

+---+-------------+----+----+
|  a|         data|col2|col3|
+---+-------------+----+----+
|  1|  [1, 2, 3,,]| One| 001|
|  6|[6, 2,, 2, 8]| Two| 089|
|  3|  [3, 4, 6,,]| One| 123|
+---+-------------+----+----+

Я знаю, что это не совсем то, что вы хотите, но это даст вам подсказку.

Другой вариант, если вы хотите полностью деконструировать вашу json, вы можете использовать данные. *

    df.select(from_json($"col1", schema).as("data"), $"col2", $"col3").select($"data.*", $"col2", $"col3")

+---+---+----+----+----+----+----+
|  a|  b|   c|   d|   f|col2|col3|
+---+---+----+----+----+----+----+
|  1|  2|   3|null|null| One| 001|
|  6|  2|null|   2|   8| Two| 089|
|  3|  4|   6|null|null| One| 123|
+---+---+----+----+----+----+----+
...