Чтение неоднозначного имени столбца в Spark sql Dataframe с использованием scala - PullRequest
1 голос
/ 03 августа 2020

У меня есть повторяющиеся столбцы в текстовом файле, и когда я пытаюсь загрузить этот текстовый файл с помощью кода spark scala, он успешно загружается во фрейм данных, и я вижу первые 20 строк с помощью df.Show ()

Полный код: -

 val sc = new SparkContext(conf)
 val hivesql = new org.apache.spark.sql.hive.HiveContext(sc)
 val rdd = sc.textFile("/...FilePath.../*")
 val fieldCount = rdd.map(_.split("[|]")).map(x => x.size).first()
 val field = rdd.zipWithIndex.filter(_._2==0).map(_._1).first()
 val fields = field.split("[|]").map(fieldName =>StructField(fieldName, StringType, nullable=true))
 val schema = StructType(fields)
 val rowRDD = rdd.map(_.split("[|]")).map(attributes => getARow(attributes,fieldCount))

val df = hivesql.createDataFrame(rowRDD, schema)
df.registerTempTable("Sample_File")
df.Show()

До этого момента мой код работает нормально. Но как только я пытаюсь ввести код ниже, это дает мне ошибку.

val results = hivesql.sql("Select id,sequence,sequence from Sample_File")

, поэтому у меня есть 2 столбца с одинаковым именем в текстовом файле, т.е. последовательность Как я могу получить доступ к этим двум столбцам .. Я пробовал с последовательностью # 2, но все еще не работает Версия Spark: -1.6.0 Scala Версия: - 2.10.5

result of df.printschema()
|-- id: string (nullable = true)
|-- sequence: string (nullable = true)
|-- sequence: string (nullable = true)

Ответы [ 2 ]

0 голосов
/ 03 августа 2020

Я второй подход @ smart_coder, но у меня немного другой подход. Найдите его ниже.

Для выполнения запроса из куста вам нужны уникальные имена столбцов sql. sql.

вы можете динамически переименовывать имена столбцов, используя приведенный ниже код:

Ваш код:

val df = hivesql.createDataFrame(rowRDD, schema)

После этого нам нужно устранить двусмысленность, решение ниже:

var list = df.schema.map(_.name).toList

for(i <- 0 to list.size -1){
    val cont = list.count(_ == list(i))
    val col = list(i)
    
    if(cont != 1){
        list = list.take(i) ++ List(col+i) ++ list.drop(i+1)
    }
}

val df1 = df.toDF(list: _*)

// вы получите результат, как показано ниже : result of df1.printschema ()

|-- id: string (nullable = true)
|-- sequence1: string (nullable = true)
|-- sequence: string (nullable = true)

Итак, в основном мы получаем все имена столбцов в виде списка, затем проверяем, повторяется ли какой-либо столбец более одного раза, если столбец повторяется, мы добавляя имя столбца с индексом, затем мы создаем новый фрейм данных d1 с новым списком с переименованными именами столбцов.

Я тестировал это в Spark 2.4, но он должен работать и в 1.6.

0 голосов
/ 03 августа 2020

Приведенный ниже код может помочь вам решить вашу проблему. Я тестировал это в Spark 1.6.3.

val sc = new SparkContext(conf)
val hivesql = new org.apache.spark.sql.hive.HiveContext(sc)
val rdd = sc.textFile("/...FilePath.../*")
val fieldCount = rdd.map(_.split("[|]")).map(x => x.size).first()
val field = rdd.zipWithIndex.filter(_._2==0).map(_._1).first()
val fields = field.split("[|]").map(fieldName =>StructField(fieldName, StringType, nullable=true))
val schema = StructType(fields)
val rowRDD = rdd.map(_.split("[|]")).map(attributes => getARow(attributes,fieldCount))

val df = hivesql.createDataFrame(rowRDD, schema)

val colNames = Seq("id","sequence1","sequence2")
val df1 = df.toDF(colNames: _*)

df1.registerTempTable("Sample_File")

val results = hivesql.sql("select id,sequence1,sequence2 from Sample_File")

...