Как «сплющить» схему Spark с переменным числом столбцов? - PullRequest
0 голосов
/ 26 февраля 2019

Это схема созданного Spark DataFrame:

root
 |-- id: double (nullable = true)
 |-- sim_scores: struct (nullable = true)
 |    |-- scores: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: map (valueContainsNull = true)
 |    |    |    |-- key: integer
 |    |    |    |-- value: vector (valueContainsNull = true)

Структура 'sim_scores' представляет класс-случай Scala, который я использую для целей агрегирования.У меня есть обычай UDAF, предназначенный для объединения этих структур.Чтобы сделать их безопасными для слияния для всех крайних случаев, они выглядят так, как они.Предположим, что для этого вопроса они должны оставаться такими.

Я хотел бы «сплющить» этот DataFrame во что-то вроде:

root
 |-- id: double (nullable = true)
 |-- score_1: map (valueContainsNull = true)
 |    |-- key: integer
 |    |-- value: vector (valueContainsNull = true)
 |-- score_2: map (valueContainsNull = true)
 |    |-- key: integer
 |    |-- value: vector (valueContainsNull = true)
 |-- score_3: map (valueContainsNull = true)
 |    |-- key: integer
 |    |-- value: vector (valueContainsNull = true)
...

Внешний MapType в структуре 'Scores'сопоставляет баллы тем с документами;внутренние карты, представляющие документ, отображают положение предложения в документе на векторную оценку.«Score_1», «Score_2», ... представляют все возможные ключи MapType «Scores» в начальном DF.

В терминах json-ish, если бы у меня был ввод, который выглядит следующим образом:

{ "id": 739874.0,
  "sim_scores": {
    "firstTopicName": {
      1: [1,9,1,0,1,1,4,6],
      2: [5,7,8,2,4,3,1,3],
      ...
    },
    "anotherTopic": {
      1: [6,8,4,1,3,4,2,0],
      2: [0,1,3,2,4,5,6,2],
      ...
    }
  }
}

тогда я получу вывод

{ "id": 739874.0,
  "firstTopicName": {
    1: [1,9,1,0,1,1,4,6],
    2: [5,7,8,2,4,3,1,3],
    ...
  }
  "anotherTopic": {
    1: [6,8,4,1,3,4,2,0],
    2: [0,1,3,2,4,5,6,2],
    ...
  }
}

Если бы я знал общее количество столбцов темы, это было бы легко;но я не.Количество тем задается пользователем во время выполнения, выходной DataFrame имеет переменное количество столбцов.Это гарантированно будет> = 1, но мне нужно спроектировать это так, чтобы он мог работать со 100 различными столбцами темы, если это необходимо.

Как я могу реализовать это?

Последнее примечание:Я застрял с использованием Spark 1.6.3;поэтому решения, которые работают с этой версией, являются лучшими.Однако я возьму любой способ сделать это в надежде на дальнейшую реализацию.

1 Ответ

0 голосов
/ 06 марта 2019

На высоком уровне, я думаю, у вас есть два варианта здесь:

  1. Использование API-интерфейса датафрейма
  2. Переключение на RDD

Если выЕсли вы хотите продолжать использовать Spark SQL, тогда вы можете использовать selectExpr и сгенерировать запрос на выборку:

it("should flatten using dataframes and spark sql") {
  val sqlContext = new SQLContext(sc)
  val df = sqlContext.createDataFrame(sc.parallelize(rows), schema)
  df.printSchema()
  df.show()
  val numTopics = 3 // input from user
  // fancy logic to generate the select expression
  val selectColumns: Seq[String] = "id" +: 1.to(numTopics).map(i => s"sim_scores['scores']['topic${i}']")
  val df2 = df.selectExpr(selectColumns:_*)
  df2.printSchema()
  df2.show()
}

Учитывая данные этого примера:

val schema = sql.types.StructType(List(
  sql.types.StructField("id", sql.types.DoubleType, nullable = true),
  sql.types.StructField("sim_scores", sql.types.StructType(List(
    sql.types.StructField("scores", sql.types.MapType(sql.types.StringType, sql.types.MapType(sql.types.IntegerType, sql.types.StringType)), nullable = true)
  )), nullable = true)
))
val rows = Seq(
  sql.Row(1d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2")))),
  sql.Row(2d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2")))),
  sql.Row(3d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2"), "topic3" -> Map(1 -> "scores3"))))
)

Вы получите такой результат:

root
 |-- id: double (nullable = true)
 |-- sim_scores.scores[topic1]: map (nullable = true)
 |    |-- key: integer
 |    |-- value: string (valueContainsNull = true)
 |-- sim_scores.scores[topic2]: map (nullable = true)
 |    |-- key: integer
 |    |-- value: string (valueContainsNull = true)
 |-- sim_scores.scores[topic3]: map (nullable = true)
 |    |-- key: integer
 |    |-- value: string (valueContainsNull = true)

+---+-------------------------+-------------------------+-------------------------+
| id|sim_scores.scores[topic1]|sim_scores.scores[topic2]|sim_scores.scores[topic3]|
+---+-------------------------+-------------------------+-------------------------+
|1.0|        Map(1 -> scores1)|        Map(1 -> scores2)|                     null|
|2.0|        Map(1 -> scores1)|        Map(1 -> scores2)|                     null|
|3.0|        Map(1 -> scores1)|        Map(1 -> scores2)|        Map(1 -> scores3)|
+---+-------------------------+-------------------------+-------------------------+

Другой вариант - переключиться на обработку RDD, где вы можете добавить более мощную логику выравнивания, основанную на ключах на карте.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...