Сохранить схему чтения файла в файл CSV в Spark scala - PullRequest
2 голосов
/ 07 мая 2020

Я читаю файл csv, используя параметр inferschema, включенный во фрейме данных, используя команду ниже.

df2 = spark.read.options(Map("inferSchema"->"true","header"->"true")).csv("s3://Bucket-Name/Fun/Map/file.csv")
df2.printSchema()

Output:

root
 |-- CC|Fun|Head|Country|SendType: string (nullable = true)

Теперь я хотел бы сохранить приведенный выше вывод только в файл csv, имеющий только этот столбец имена и тип данных этих столбцов, как показано ниже.

column_name,datatype
CC,string
Fun,string
Head,string
Country,string
SendType,string

Я попытался записать это в CSV, используя параметр ниже, но это записывает файл со всеми данными.

df2.coalesce(1).write.format("csv").mode("append").save("schema.csv")

касаемо махи

Ответы [ 3 ]

1 голос
/ 07 мая 2020

df.schema.fields, чтобы получить поля и их тип данных.

Проверьте код ниже.

scala> val schema = df.schema.fields.map(field => (field.name,field.dataType.typeName)).toList.toDF("column_name","datatype")
schema: org.apache.spark.sql.DataFrame = [column_name: string, datatype: string]

scala> schema.show(false)
+---------------+--------+
|column_name    |datatype|
+---------------+--------+
|applicationName|string  |
|id             |string  |
|requestId      |string  |
|version        |long    |
+---------------+--------+


scala> schema.write.format("csv").save("/tmp/schema")

0 голосов
/ 07 мая 2020

Альтернативой решениям @ QuickSilver и @Srinivas, с которыми они оба должны работать, является использование DDL представления схемы. С df.schema.toDDL вы получаете:

CC STRING, fun STRING, Head STRING, Country STRING, SendType STRING

, которое является строковым представлением схемы, затем вы можете разделить и заменить, как показано ниже:

import java.io.PrintWriter

val schema = df.schema.toDDL.split(",")
// Array[String] = Array(`CC` STRING, `fun` STRING, `Head` STRING, `Country` STRING, `SendType` STRING)

val writer = new PrintWriter("/tmp/schema.csv")

writer.write("column_name,datatype\n")
schema.foreach{ r => writer.write(r.replace(" ", ",") + "\n") }
writer.close()

To напишите в S3, вы можете использовать Had oop API как уже реализованный QuickSilver или стороннюю библиотеку, такую ​​как MINIO :

import io.minio.MinioClient

val minioClient = new MinioClient("https://play.min.io", "ACCESS_KEY", "SECRET_KEY")

minioClient.putObject("YOUR_BUCKET","schema.csv", "/tmp/schema.csv", null)

Или даже лучше , создавая строку, сохраняя ее в буфере и затем отправляя через InputStream на S3:

import java.io.ByteArrayInputStream
import io.minio.MinioClient

val minioClient = new MinioClient("https://play.min.io", "ACCESS_KEY", "SECRET_KEY")

val schema = df.schema.toDDL.split(",")
val schemaBuffer = new StringBuilder

schemaBuffer ++= "column_name,datatype\n"
schema.foreach{ r => schemaBuffer ++= r.replace(" ", ",") + "\n" }

val inputStream = new ByteArrayInputStream(schemaBuffer.toString.getBytes("UTF-8"))

minioClient.putObject("YOUR_BUCKET", "schema.csv", inputStream, new PutObjectOptions(inputStream.available(), -1))

inputStream.close
0 голосов
/ 07 мая 2020

Попробуйте что-то вроде ниже, используйте coalesce(1) и .option("header","true") для вывода с заголовком

import java.io.FileWriter

object SparkSchema {

  def main(args: Array[String]): Unit = {

    val fw = new FileWriter("src/main/resources/csv.schema", true)
    fw.write("column_name,datatype\n")

    val spark = Constant.getSparkSess

    import spark.implicits._

    val df = List(("", "", "", 1l)).toDF("applicationName", "id", "requestId", "version")
    val columnList : List[(String, String)] = df.schema.fields.map(field => (field.name, field.dataType.typeName))
      .toList
    try {
      val outString = columnList.map(col => {
        col._1 + "," + col._2
      }).mkString("\n")
      fw.write(outString)
    }
    finally fw.close()

    val newColumnList : List[(String, String)] = List(("newColumn","integer"))

    val finalColList = columnList ++ newColumnList
    writeToS3("s3://bucket/newFileName.csv",finalColList)

  }

  def writeToS3(s3FileNameWithpath : String,finalColList : List[(String,String)]) {

    val outString =  finalColList.map(col => {
      col._1 + "," + col._2
    }).mkString("\\n")

    import org.apache.hadoop.fs._
    import org.apache.hadoop.conf.Configuration
    val conf = new Configuration()
    conf.set("fs.s3a.access.key", "YOUR ACCESS KEY")
    conf.set("fs.s3a.secret.key", "YOUR SECRET KEY")

    val dest = new Path(s3FileNameWithpath)
    val fs = dest.getFileSystem(conf)
    val out = fs.create(dest, true)
    out.write( outString.getBytes )
    out.close()
  }

}


...