После анализа вашей проблемы я делаю следующие предположения:
1. data source can be anything, primarily HDFS
2. delimiter can be anything
3. you're maintaining structure for each source.
4. file does not contains header
Предложение: здесь проблема в том, что вы должны сгенерировать StructType, если ваши данные не содержат заголовок. Придумайте некоторую структуру, которая может быть структурой json для определения вашего источника данных. Затем загрузите и проанализируйте JSON, используя Jackson, используя Scala. Или просто передайте column_map в вашу программу.
Example:
{
"inputLocation": "",
"delimiter" : ",",
"column_map" : "col1, datatype; col12, datatype;col1, datatype; col12, datatype"
"outputLocation": ""
}
Теперь используйте column_map для динамического создания типа структуры.
object GenerateStructType {
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
def generateStrucType(columnsList: Seq[String]): StructType = {
val res=columnsList.map( columnDetail => {
val columnName = columnDetail.split(",")(0).trim
val columnType = columnDetail.split(",")(1).trim
columnType match {
case "String" => StructField(columnName,StringType,true)
case "Bool" => StructField(columnName,BooleanType,true)
case _ => StructField(columnName,StringType,true)
}
})
StructType(res)
}
def main(args: Array[String]): Unit = {
val columnMap= "col1, datatype; col12, datatype;col1, datatype; col12, datatype"
val result= GenerateStructType.generateStrucType( columnMap.split(";"))
println(result)
}
}
динамически сгенерированного StructType :
StructType(StructField(col1,StringType,true), StructField(col12,StringType,true), StructField(col1,StringType,true), StructField(col12,StringType,true))
используйтеТип структуры при загрузке данных.
Надеюсь, это поможет ....