Преобразовать вложенную искру Row в вложенную карту Map [String, Any] (и наоборот) - PullRequest
0 голосов
/ 11 января 2019

Хотите функции

row предполагается как вложенная структура. Ключи выходной карты должны быть полными путями ключей поля (/ имена столбцов). Например. если входная структура {foo: { bar: 1}, bob: "alice"}, тогда выходная карта должна быть Map("foo" -> Map("foo.bar" -> 1))

def rowToMap(row: Row): Map[String, Any]

Надеюсь, что есть хороший способ сделать это, если нет необходимости использовать рекурсию на row.schema.

Аналогично, для вложенной карты, например, Map("foo" -> Map("bar" -> 1), "bob" -> "alice") (обратите внимание, нам не нужно анализировать полный путь).

def mapToRow(map: Map[String, Any]): Row

1 Ответ

0 голосов
/ 11 апреля 2019
object RDDOfMapToDataFrame {
  def apply(rdd: RDD[Map[String, Any]], schema: StructType)
           (implicit sparkSession: SparkSession): DataFrame =
    sparkSession.createDataFrame(rdd.map(mapToRow(_, schema.structType)), schema)

  def getStructTypeFromStructType(field: String, schema: StructType): StructType =
    schema.fields(schema.fieldIndex(field)).dataType.asInstanceOf[StructType]

  def getStructTypeFromArrayType(field: String, schema: StructType): StructType =
    schema.fields(schema.fieldIndex(field)).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType]

  def mapToRow(m: Map[String, Any], schema: StructType): Row = Row.fromSeq(m.toList.map {
    case (key, struct: Map[String, Any]@unchecked) =>
      schema.fieldIndex(key) -> mapToRow(struct, getStructTypeFromStructType(key, schema))
    // Intellij is confused by this line, please leave as is
    case (key, mapList) if mapList.isInstanceOf[TraversableOnce[_]]
      && mapList.asInstanceOf[TraversableOnce[Any]].toSeq.headOption.exists(_.isInstanceOf[Map[_, _]]) =>
      schema.fieldIndex(key) ->
        mapList.asInstanceOf[TraversableOnce[Any]]
          .toSeq
          .map(_.asInstanceOf[Map[String, Any]])
          .map(mapToRow(_, getStructTypeFromArrayType(key, schema)))
    case (key, None) =>
      schema.fieldIndex(key) -> null
    case (key, Some(other: Map[_, _])) =>
      schema.fieldIndex(key) -> mapToRow(other.asInstanceOf[Map[String, Any]], getStructTypeFromStructType(key, schema))
    case (key, Some(mapList))
      if mapList.isInstanceOf[TraversableOnce[_]]
        && mapList.asInstanceOf[TraversableOnce[Any]].toSeq.headOption.exists(_.isInstanceOf[Map[_, _]]) =>
      schema.fieldIndex(key) ->
        mapList.asInstanceOf[TraversableOnce[Any]]
          .toSeq
          .map(_.asInstanceOf[Map[String, Any]])
          .map(mapToRow(_, getStructTypeFromArrayType(key, schema)))
    case x@(key, Some(other)) =>
      schema.fieldIndex(key) -> other
    case (key, other) =>
      schema.fieldIndex(key) -> other
  }.sortBy(_._1).map(_._2))

  def rowToMap(row: Row): Map[String, Any] = row.schema.fieldNames.zip(row.toSeq.map {
    case row: Row => rowToMap(row)
    // Intellij is confused by this line, please leave as is
    case seqOfRow@((_: Row) :: _) => seqOfRow.map(_.asInstanceOf[Row]).map(rowToMap)
    case any => any
  }).toMap
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...