Сглаживание типов структур в Scala - PullRequest
2 голосов
/ 31 марта 2019

Я пытаюсь создать список из структурного типа во фрейме Spark Data. Схема выглядит примерно так

root
|
|-- plotList: array (nullable = true)
|    |-- element: string (containsNull = true)
|-- plot: struct (nullable = true)
|    |-- test: struct (nullable = true)
|    |    |-- body: string (nullable = true)
|    |    |-- colorPair: struct (nullable = true)
|    |    |    |-- background: string (nullable = true)
|    |    |    |-- foreground: string (nullable = true)
|    |    |-- eta: struct (nullable = true)
|    |    |    |-- etaText: string (nullable = true)
|    |    |    |-- etaType: string (nullable = true)
|    |    |    |-- etaValue: string (nullable = true)
|    |    |-- headline: string (nullable = true)
|    |    |-- plotType: string (nullable = true)
|    |    |-- priority: long (nullable = true)
|    |    |-- plotCategory: string (nullable = true)
|    |    |-- productType: string (nullable = true)
|    |    |-- theme: string (nullable = true)
|    |-- temp: struct (nullable = true)
|    |    |-- body: string (nullable = true)
|    |    |-- colorPair: struct (nullable = true)
|    |    |    |-- background: string (nullable = true)
|    |    |    |-- foreground: string (nullable = true)
|    |    |-- eta: struct (nullable = true)
|    |    |    |-- etaText: string (nullable = true)
|    |    |    |-- etaType: string (nullable = true)
|    |    |    |-- etaValue: string (nullable = true)
|    |    |-- headline: string (nullable = true)
|    |    |-- logo: string (nullable = true)
|    |    |-- plotType: string (nullable = true)
|    |    |-- priority: long (nullable = true)
|    |    |-- plotCategory: string (nullable = true)
|    |    |-- plotType: string (nullable = true)
|    |    |-- theme: string (nullable = true)

Я пытаюсь написать UDF, который может преобразовать столбец plot в список элементов, которые я могу взорвать на следующей итерации. Что-то в строках сюжета -> [test, temp], где я могу выбрать некоторые конкретные столбцы из test и temp. Был бы очень признателен за любые указатели в правильном направлении. Я пробовал несколько вариантов UDF, но ни один из них, похоже, не работает.

Edit:

Я хочу создать сплющенную структуру из подколонок столбца графика. Я думаю об использовании кейсов для этого. Что-то вроде

case class ColorPair(back:String, fore:String)
case class Eta(EtaText: String, EtaType: String, EtaValue: String)
case class Plot(body:String, colorPair: ColorPair, eta: Eta, headline: String, plotType: String, priority: String, plotCategory: String, plotType: String, theme: String)

Итак, по сути, в конце этого я ожидаю что-то вроде List(Plot), которое я могу explode на последующих шагах. Поскольку Explode не работает непосредственно на Struct Types, я должен пройти через это преобразование. В мире питонов я бы легко прочитал этот столбец как словарь, но в Scala ничего такого не существует (о чем я знаю).

1 Ответ

1 голос
/ 01 апреля 2019

Если я правильно понял, вы ищете способ перебрать свою схему, и когда colorPair или eta вернет эти поля следующим образом:

plot.test.colorPair
plot.test.eta
plot.temp.colorPair
plot.temp.eta

Чтобы сгенерировать данные (схему) для вашего случая, я написал следующий код:

  case class Eta(etaText: String, etaType: String, etaValue: String)
  case class ColorPair(background: String, foreground: String)
  case class Test(body: String, colorPair: ColorPair, eta: Eta, headline: String, plotType: String, priority: Long, plotCategory: String, productType: String, theme: String)
  case class Temp(body: String, colorPair: ColorPair, eta: Eta ,headline: String, logo: String, plotType: String, priority: Long, plotCategory: String, productType: String, theme: String)
  case class Plot(test: Test, temp: Temp)
  case class Root(plotList: Array[String], plot: Plot)

  def getSchema(): StructType ={
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.catalyst.ScalaReflection
    val schema = ScalaReflection.schemaFor[Root].dataType.asInstanceOf[StructType]

    schema.printTreeString()
    schema
  }

Это будет иметь вывод:

root
 |-- plotList: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- plot: struct (nullable = true)
 |    |-- test: struct (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- colorPair: struct (nullable = true)
 |    |    |    |-- background: string (nullable = true)
 |    |    |    |-- foreground: string (nullable = true)
 |    |    |-- eta: struct (nullable = true)
 |    |    |    |-- etaText: string (nullable = true)
 |    |    |    |-- etaType: string (nullable = true)
 |    |    |    |-- etaValue: string (nullable = true)
 |    |    |-- headline: string (nullable = true)
 |    |    |-- plotType: string (nullable = true)
 |    |    |-- priority: long (nullable = false)
 |    |    |-- plotCategory: string (nullable = true)
 |    |    |-- productType: string (nullable = true)
 |    |    |-- theme: string (nullable = true)
 |    |-- temp: struct (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- colorPair: struct (nullable = true)
 |    |    |    |-- background: string (nullable = true)
 |    |    |    |-- foreground: string (nullable = true)
 |    |    |-- eta: struct (nullable = true)
 |    |    |    |-- etaText: string (nullable = true)
 |    |    |    |-- etaType: string (nullable = true)
 |    |    |    |-- etaValue: string (nullable = true)
 |    |    |-- headline: string (nullable = true)
 |    |    |-- logo: string (nullable = true)
 |    |    |-- plotType: string (nullable = true)
 |    |    |-- priority: long (nullable = false)
 |    |    |-- plotCategory: string (nullable = true)
 |    |    |-- productType: string (nullable = true)
 |    |    |-- theme: string (nullable = true)

Наконец, следующий код должен сгладить нужные поля:

def flattenSchema(schema: StructType, targetFields: List[String], prefix: String = null): Array[String]=
  {
    import org.apache.spark.sql.types._
    schema.fields.flatMap(f => {
      val colName = if (prefix == null) f.name else (prefix + "." + f.name)

      f.dataType match {
        case st : StructType =>
          val found = st.filter(s => targetFields.contains(s.name))

          if(found.isEmpty) {
            flattenSchema(st, targetFields, colName)
          }
          else
            found.flatMap(sf => {
              val st = sf.dataType.asInstanceOf[StructType]
              st.map(st => s"${colName}.${sf.name}.${st.name}")
            })

        case _ => Array[String]()
      }
    })
  }

Приведенный выше код сканирует схему, чтобы найти поля, существующие в списке targetFields, а затем использует flatMap для извлечения схемы для этих полей.

Это должнобыть выводом:

plot.test.colorPair.background
plot.test.colorPair.foreground
plot.test.eta.etaText
plot.test.eta.etaType
plot.test.eta.etaValue
plot.temp.colorPair.background
plot.temp.colorPair.foreground
plot.temp.eta.etaText
plot.temp.eta.etaType
plot.temp.eta.etaValue
...