Spark - сопоставить плоский фрейм данных с настраиваемой вложенной схемой json - PullRequest
0 голосов
/ 25 апреля 2019

У меня плоский фрейм данных с 5-6 столбцами. Я хочу вложить их и преобразовать во вложенный массив данных, чтобы потом можно было записать их в формат паркета.

Однако я не хочу использовать case-классы, так как я стараюсь сделать код максимально настраиваемым. Я застрял с этой частью и мне нужна помощь.

Мой вклад:

ID ID-2 Count(apple) Count(banana) Count(potato) Count(Onion)

 1  23    1             0             2             0

 2  23    0             1             0             1

 2  29    1             0             1             0

Мой вывод:

ROW 1:

{
  "id": 1,
  "ID-2": 23,
  "fruits": {
    "count of apple": 1,
    "count of banana": 0
  },
  "vegetables": {
    "count of potato": 2,
    "count of onion": 0
  }
} 

Я попытался использовать функцию map в кадре данных spark, где я сопоставляю свои значения с классом case. Тем не менее, я поиграюсь с названиями полей и тоже могу их поменять.

Я не хочу поддерживать класс case и сопоставлять строки с именами столбцов sql, так как это повлечет за собой изменения кода каждый раз.

Я думал о том, чтобы сохранить Hashmap с именами столбцов, которые я хочу сохранить с именами столбцов информационного кадра. Например, в этом примере я сопоставляю «Count (apple)» с «count of apple». Тем не менее, я не могу придумать простой способ передать мою схему как конфигурацию, а затем отобразить ее в своем коде

Ответы [ 3 ]

2 голосов
/ 25 апреля 2019

Вот один подход, использующий тип scala Map для создания сопоставлений столбцов с использованием следующего набора данных:

val data = Seq(
(1, 23, 1, 0, 2, 0),
(2, 23, 0, 1, 0, 1),
(2, 29, 1, 0, 1, 0)).toDF("ID", "ID-2", "count(apple)", "count(banana)", "count(potato)", "count(onion)")

Сначала мы объявляем сопоставления, используя scala.collection.immutable.Map collection и функцию, которая отвечает за сопоставление:

import org.apache.spark.sql.{Column, DataFrame}

val colMapping = Map(
        "count(banana)" -> "no of banana", 
        "count(apple)" -> "no of apples", 
        "count(potato)" -> "no of potatos", 
        "count(onion)" -> "no of onions")

def mapColumns(colsMapping: Map[String, String], df: DataFrame) : DataFrame = {
       val mapping = df
         .columns
         .map{ c => if (colsMapping.contains(c)) df(c).alias(colsMapping(c)) else df(c)}
         .toList

        df.select(mapping:_*)
}

Функция выполняет итерации по столбцам данного кадра данных и идентифицирует столбцы, имеющие общие ключи, с mapping. Затем он возвращает столбец, меняющий свое имя (с псевдонимом) в соответствии с примененными сопоставлениями.

Выход mapColumns(colMapping, df).show(false):

+---+----+------------+------------+-------------+------------+
|ID |ID-2|no of apples|no of banana|no of potatos|no of onions|
+---+----+------------+------------+-------------+------------+
|1  |23  |1           |0           |2            |0           |
|2  |23  |0           |1           |0            |1           |
|2  |29  |1           |0           |1            |0           |
+---+----+------------+------------+-------------+------------+

Наконец, мы производим фрукты и овощи с помощью struct типа:

df1.withColumn("fruits", struct(col(colMapping("count(banana)")), col(colMapping("count(apple)"))))
.withColumn("vegetables", struct(col(colMapping("count(potato)")), col(colMapping("count(onion)"))))
.drop(colMapping.values.toList:_*)
.toJSON
.show(false)

Обратите внимание, что мы отбрасываем все столбцы коллекции colMapping после завершения преобразований.

Выход:

+-----------------------------------------------------------------------------------------------------------------+
|value                                                                                                            |
+-----------------------------------------------------------------------------------------------------------------+
|{"ID":1,"ID-2":23,"fruits":{"no of banana":0,"no of apples":1},"vegetables":{"no of potatos":2,"no of onions":0}}|
|{"ID":2,"ID-2":23,"fruits":{"no of banana":1,"no of apples":0},"vegetables":{"no of potatos":0,"no of onions":1}}|
|{"ID":2,"ID-2":29,"fruits":{"no of banana":0,"no of apples":1},"vegetables":{"no of potatos":1,"no of onions":0}}|
+-----------------------------------------------------------------------------------------------------------------+
0 голосов
/ 25 апреля 2019
val df = spark.sqlContext.read.option("header","true").csv("/sampleinput.txt")

val df1 = df.withColumn("fruits",struct("Count(apple)","Count(banana)") ).withColumn("vegetables",struct("Count(potato)","Count(Onion)")).groupBy("ID","ID-2").agg(collect_list("fruits") as "fruits",collect_list("vegetables") as "vegetables").toJSON 

df1.take(1)

Выход:

{"ID":"2","ID-2":"23","fruits":[{"Count(apple)":"0","Count(banana)":"1"}],"vegetables":[{"Count(potato)":"0","Count(Onion)":"1"}]}
0 голосов
/ 25 апреля 2019

:: (двойное двоеточие) в scala рассматривается как "минусы" в списках scala. Это способ создания списка Scala или вставки элемента в существующий изменяемый список.

scala> val aList = 24 :: 34 :: 56 :: Nil
aList: List[Int] = List(24, 34, 56)

scala> 99 :: aList
res3: List[Int] = List(99, 24, 34, 56)

В первом примере Nil является пустым списком и рассматривается как хвост для самой правой операции cons.

Однако

scala> val anotherList = 23 :: 34
<console>:12: error: value :: is not a member of Int
       val anotherList = 23 :: 34

Это приводит к ошибке, потому что нет существующего списка для вставки.

...