У меня есть фрейм данных, который выглядит следующим образом:
country | user | count
----------------------
Germany | Sarah| 2
China | Paul | 1
Germany | Alan | 3
Germany | Paul | 1
...
Я пытаюсь преобразовать этот фрейм данных в другой, который выглядит следующим образом:
dimension | value
--------------------------------------------
Country | [Germany -> 4, China -> 1]
--------------------------------------------
User | [Sarah -> 2, Paul -> 2, Alan -> 3]
...
Сначала Я попытался сделать это так:
var newDF = Seq.empty[(String, Map[String,Long])].toDF("dimension", "value")
df.collect()
.foreach(row => { Array(0,1)
.map(pos =>
newDF = newDF.union(Seq((df.columns.toSeq(pos).toString, Map(row.mkString(",").split(",")(pos) -> row.mkString(",").split(",")(2).toLong))).toDF())
)
})
val newDF2 = newDF.groupBy("dimension").agg(collect_list("value")).as[(String, Seq[Map[String, Long]])].map {case (id, list) => (id, list.reduce(_ |+| _))}.toDF("dimension", "value")
Но collect()
убивал моего водителя. Поэтому я попытался сделать это так:
class DimItem[T](val dimension: String, val value: String, val metric: T)
val items: RDD[DimItem[Long]] = df.rdd.flatMap(row => {
dims.zipWithIndex.map{case (dim, i) =>
new DimItem(dim, row(i).toString, row(13).asInstanceOf[Long])
}
})
// with the format [ DimItem(Country, Germany, 2), DimItem(User, Sarah, 2)], ...
val itemsGrouped: RDD[((String, String), Iterable[DimItem[Long]])] = items.groupBy(x => (x.dimension, x.value))
val aggregatedItems: RDD[DimItem[Long]] = itemsGrouped.map{case (key, items) => new DimItem(key._1, key._2, items.reduce((a,b) => a.metric + b.metric)}
Идея состоит в том, чтобы сохранить в RDD объекты типа (Страна, Китай, 1), (Страна, Германия, 3), (Страна, Германия , 1), ... и затем сгруппируйте его по двум первым ключам (Страна, Китай), (Страна, Германия), ... После группировки, сложите их количество. Пример: наличие (Страна, Германия, 3), (Страна, Германия, 1) станет (Страна, Германия, 4).
Но как только я сюда попал, это говорит мне, что в items.reduce()
есть несоответствие: он ожидает DimItem [Long], но получает Long.
Следующим шагом будет сгруппировать его по ключу «измерение» и создать формат Map[String, Int]()
в столбце «значение» и преобразовать его в DF.
У меня есть 2 вопроса.
Первый : правильный ли последний код?
Секунда : Как я могу преобразовать этот MapPartitionsRDD в DF?