Можем ли мы автоматизировать создание запросов Spark SQL из схемы AVRO? - PullRequest
1 голос
/ 12 июля 2020

Я работаю над проектом, где каждый день мне нужно иметь дело с тоннами файлов AVRO. Для извлечения данных из AVRO я использую spark SQL. Чтобы добиться этого, мне сначала нужно printSchema, а затем мне нужно выбрать поля для просмотра данных. Я хочу автоматизировать этот процесс. Учитывая любой ввод AVRO, я хочу написать скрипт, который будет автоматически генерировать запрос Spark SQL (с учетом структуры и массивов в файле avs c). Я могу написать сценарий в Java или Python.

- Пример ввода AVRO

root
|-- identifier: struct (nullable = true)
|    |-- domain: string (nullable = true)
|    |-- id: string (nullable = true)
|    |-- version: long (nullable = true)
alternativeIdentifiers: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- identifier: struct (nullable = true)
|    |    |    |    |-- domain: string (nullable = true)
|    |    |    |    |-- id: string (nullable = true)

- Выход I жду

SELECT identifier.domain, identifier.id, identifier.version

1 Ответ

0 голосов
/ 14 июля 2020

Вы можете использовать что-то вроде этого для генерации списка столбцов на основе схемы:

  import org.apache.spark.sql.types.{StructField, StructType}
  def getStructFieldName(f: StructField, baseName: String = ""): Seq[String] = {
    val bname = if (baseName.isEmpty) "" else baseName + "."
    f.dataType match {
      case StructType(s) =>
        s.flatMap(x => getStructFieldName(x, bname + f.name))
      case _ => Seq(bname + f.name)
    }
  }

Затем это можно было бы использовать в реальном фрейме данных, например:

val data = spark.read.json("some_data.json")
val cols = data.schema.flatMap(x => getStructFieldName(x))

в результате мы получаем последовательность строк, которую мы можем использовать либо для выполнения select:

import org.apache.spark.sql.functions.col
data.select(cols.map(col): _*)

, либо мы можем сгенерировать список, разделенный запятыми, который мы можем использовать в spark.sql:

spark.sql(s"select ${cols.mkString(", ")} from table")
...