Преобразование MapPartitionsRDD в DataFrame и группирование данных по 2 ключам - PullRequest
2 голосов
/ 26 февраля 2020

У меня есть фрейм данных, который выглядит следующим образом:

  country | user | count
  ----------------------
  Germany | Sarah| 2
  China   | Paul | 1
  Germany | Alan | 3
  Germany | Paul | 1
          ...

Я пытаюсь преобразовать этот фрейм данных в другой, который выглядит следующим образом:

  dimension | value
  --------------------------------------------
  Country   | [Germany -> 4, China -> 1]
  --------------------------------------------
  User      | [Sarah -> 2, Paul -> 2, Alan -> 3]
          ...

Сначала Я попытался сделать это так:

  var newDF = Seq.empty[(String, Map[String,Long])].toDF("dimension", "value")
  df.collect()
    .foreach(row => { Array(0,1)
            .map(pos => 
             newDF = newDF.union(Seq((df.columns.toSeq(pos).toString, Map(row.mkString(",").split(",")(pos) -> row.mkString(",").split(",")(2).toLong))).toDF())
             )
     })
  val newDF2 = newDF.groupBy("dimension").agg(collect_list("value")).as[(String, Seq[Map[String, Long]])].map {case (id, list) => (id, list.reduce(_ |+| _))}.toDF("dimension", "value")

Но collect() убивал моего водителя. Поэтому я попытался сделать это так:

 class DimItem[T](val dimension: String, val value: String, val metric: T) 


 val items: RDD[DimItem[Long]] = df.rdd.flatMap(row => {
                                dims.zipWithIndex.map{case (dim, i) => 
                                                  new DimItem(dim, row(i).toString, row(13).asInstanceOf[Long])
                                                  }
                                })  
 // with the format [ DimItem(Country, Germany, 2), DimItem(User, Sarah, 2)], ...

val itemsGrouped: RDD[((String, String), Iterable[DimItem[Long]])] = items.groupBy(x => (x.dimension, x.value))
val aggregatedItems: RDD[DimItem[Long]] = itemsGrouped.map{case (key, items) => new DimItem(key._1, key._2, items.reduce((a,b) => a.metric + b.metric)}

Идея состоит в том, чтобы сохранить в RDD объекты типа (Страна, Китай, 1), (Страна, Германия, 3), (Страна, Германия , 1), ... и затем сгруппируйте его по двум первым ключам (Страна, Китай), (Страна, Германия), ... После группировки, сложите их количество. Пример: наличие (Страна, Германия, 3), (Страна, Германия, 1) станет (Страна, Германия, 4).

Но как только я сюда попал, это говорит мне, что в items.reduce() есть несоответствие: он ожидает DimItem [Long], но получает Long.

Следующим шагом будет сгруппировать его по ключу «измерение» и создать формат Map[String, Int]() в столбце «значение» и преобразовать его в DF.

У меня есть 2 вопроса.

Первый : правильный ли последний код?

Секунда : Как я могу преобразовать этот MapPartitionsRDD в DF?

1 Ответ

1 голос
/ 27 февраля 2020

Вот одно решение, основанное на API данных:

import org.apache.spark.sql.functions.{lit, map_from_arrays, collect_list}

def transform(df :DataFrame, colName: String) : DataFrame = 
  df.groupBy(colName)
    .agg{sum("count").as("sum")}
    .agg{
      map_from_arrays(
        collect_list(colName),
        collect_list("sum")
      ).as("value")
    }.select(lit(colName).as("dimension"), $"value")

val countryDf = transform(df, "country")
val userDf = transform(df, "user")

countryDf.unionByName(userDf).show(false)

// +---------+----------------------------------+
// |dimension|value                             |
// +---------+----------------------------------+
// |Country  |[Germany -> 6, China -> 1]        |
// |User     |[Sarah -> 2, Alan -> 3, Paul -> 2]|
// +---------+----------------------------------+

Анализ: сначала мы получаем сумму по стране и группировке пользователей по стране и пользователю соответственно. Затем мы добавляем еще одну пользовательскую агрегацию в конвейер, который собирает предыдущие результаты в карту. Карта будет заполняться с помощью функции map_from_arrays , найденной в Spark 2.4.0. Ключи / значения карты мы собираем их с помощью collect_list. Наконец, мы объединяем два кадра данных, чтобы заполнить окончательные результаты.

...