По сути, вы должны создать выражение Column
, которое приведёт ваш ввод к типу с очищенными именами полей. Для этого вы можете использовать функцию org.apache.spark.sql.functions.struct
, которая позволяет комбинировать другие Column
для построения столбца структурного типа. Примерно так должно работать:
import org.apache.spark.sql.{functions => f}
def sanitizeName(s: String): String = s.replace(" ", "_")
def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
st.fields.map { sf =>
val sanitizedName = sanitizeName(sf.name)
val sanitizedField = sf.dataType match {
case struct: StructType =>
val subcontext = context(sf.name)
sanitizeFieldNames(struct, subcontext(_))
case _ => context(sf.name)
}
sanitizedField.as(sanitizedName)
}: _*
)
Вы используете это так:
val df: DataFrame = ...
val appFieldType = df.schema("app").asInstanceOf[StructType] // or otherwise obtain the field type
df.withColumn(
"app",
sanitizeFieldNames(appFieldType, df("app")(_))
)
Для вашего типа эта рекурсивная функция будет возвращать столбец типа
f.struct(
df("app")("environment").as("environment"),
df("app")("name").as("name"),
f.struct(
df("app")("type")("word tier").as("word_tier"),
df("app")("type")("level").as("level")
).as("type")
)
, который затем присваивается полю «app», заменяя то, что там присутствует.
Однако у этого решения есть ограничение. Он не поддерживает вложенные массивы или карты: если у вас есть схема со структурами внутри массивов или карт, этот метод не будет преобразовывать какие-либо структуры внутри массивов и карт. При этом в Spark 2.4 они добавили функции, которые выполняют операции с коллекциями, поэтому возможно, что в Spark 2.4 эта функция может быть обобщена для поддержки вложенных массивов и карт.
Наконец, можно сделать то, что вы хотите с mapPartitions
. Сначала вы пишете рекурсивный метод, который очищает только StructType
вашего поля:
def sanitizeType(dt: DataType): DataType = dt match {
case st: StructType => ... // rename fields and invoke recursively
case at: ArrayType => ... // invoke recursively
case mt: MapType => ... // invoke recursively
case _ => dt // simple types do not have anything to sanitize
}
Во-вторых, вы применяете очищенную схему к вашему фрейму данных. Есть два основных способа сделать это: безопасный mapPartitions
и один, использующий внутренний Spark API.
С mapPartitions
это просто:
df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))
Здесь мы применяем операцию mapPartitions
и явно указываем выходной кодер. Помните, что схемы в Spark не свойственны данным: они всегда связаны с конкретным фреймом данных. Все данные внутри фрейма данных представлены в виде строк без меток на отдельных полях, только позиции. Пока ваша схема имеет одинаковые типы на тех же позициях (но с потенциально разными именами), она должна работать так, как вы ожидаете.
mapPartitions
приводит к нескольким дополнительным узлам в логическом плане. Чтобы избежать этого, можно создать экземпляр Dataset[Row]
напрямую с помощью специального кодера:
new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))
Это позволит избежать ненужных mapPartitions
(что, как правило, приводит к шагам deserialize-map-serialize в плане выполнения запроса), но это может быть небезопасно; Я лично не проверял это сейчас, но это могло бы работать на вас.