object RDDOfMapToDataFrame {
def apply(rdd: RDD[Map[String, Any]], schema: StructType)
(implicit sparkSession: SparkSession): DataFrame =
sparkSession.createDataFrame(rdd.map(mapToRow(_, schema.structType)), schema)
def getStructTypeFromStructType(field: String, schema: StructType): StructType =
schema.fields(schema.fieldIndex(field)).dataType.asInstanceOf[StructType]
def getStructTypeFromArrayType(field: String, schema: StructType): StructType =
schema.fields(schema.fieldIndex(field)).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType]
def mapToRow(m: Map[String, Any], schema: StructType): Row = Row.fromSeq(m.toList.map {
case (key, struct: Map[String, Any]@unchecked) =>
schema.fieldIndex(key) -> mapToRow(struct, getStructTypeFromStructType(key, schema))
// Intellij is confused by this line, please leave as is
case (key, mapList) if mapList.isInstanceOf[TraversableOnce[_]]
&& mapList.asInstanceOf[TraversableOnce[Any]].toSeq.headOption.exists(_.isInstanceOf[Map[_, _]]) =>
schema.fieldIndex(key) ->
mapList.asInstanceOf[TraversableOnce[Any]]
.toSeq
.map(_.asInstanceOf[Map[String, Any]])
.map(mapToRow(_, getStructTypeFromArrayType(key, schema)))
case (key, None) =>
schema.fieldIndex(key) -> null
case (key, Some(other: Map[_, _])) =>
schema.fieldIndex(key) -> mapToRow(other.asInstanceOf[Map[String, Any]], getStructTypeFromStructType(key, schema))
case (key, Some(mapList))
if mapList.isInstanceOf[TraversableOnce[_]]
&& mapList.asInstanceOf[TraversableOnce[Any]].toSeq.headOption.exists(_.isInstanceOf[Map[_, _]]) =>
schema.fieldIndex(key) ->
mapList.asInstanceOf[TraversableOnce[Any]]
.toSeq
.map(_.asInstanceOf[Map[String, Any]])
.map(mapToRow(_, getStructTypeFromArrayType(key, schema)))
case x@(key, Some(other)) =>
schema.fieldIndex(key) -> other
case (key, other) =>
schema.fieldIndex(key) -> other
}.sortBy(_._1).map(_._2))
def rowToMap(row: Row): Map[String, Any] = row.schema.fieldNames.zip(row.toSeq.map {
case row: Row => rowToMap(row)
// Intellij is confused by this line, please leave as is
case seqOfRow@((_: Row) :: _) => seqOfRow.map(_.asInstanceOf[Row]).map(rowToMap)
case any => any
}).toMap
}