Разархивируйте несколько файлов * .gz и создайте один CSV-файл в Spark Scala - PullRequest
0 голосов
/ 26 сентября 2018

У меня несколько файлов в корзине S3, и мне нужно распаковать эти файлы и объединить все файлы в один файл (CSV) с одним заголовком.Все файлы содержат одинаковый заголовок.

Файлы данных выглядят как показано ниже.

Система хранения: корзина S3.

 part-0000-XXXX.csv.gz
 part_0001-YYYY.csv.gz
 part-0002-ZZZZ.csv.gz
 .
 .
 .
 .
 part-0010_KKKK.csv.gz.

Я хочу один файл CSV из всехфайлы, как показано выше.Пожалуйста, помогите мне, как распаковать и объединить все файлы.

После распаковки и объединения всех файлов в один CSV, я могу использовать этот файл для сравнения данных с предыдущими файлами ..

IЯ использую spark 2.3.0 и scala 2.11

Большое спасибо.

Ответы [ 2 ]

0 голосов
/ 26 сентября 2018

Ниже упомянутый код работает нормально.

scala> val rdd = sc.textFile("/root/data")
rdd: org.apache.spark.rdd.RDD[String] = /root/data MapPartitionsRDD[1] at textFile at <console>:24


scala> rdd.coalesce(1).saveAsTextFile("/root/combinedCsv", classOf[org.apache.hadoop.io.compress.GzipCodec])

Вы видите, что входные данные находятся в каталоге /root/data, а объединенный csv в формате gzip хранится в каталоге /root/combinedCsv.

Обновление

Если вы хотите сохранить данные в формате csv, удалите часть GzipCodec.

scala> rdd.coalesce(1).saveAsTextFile("/root/combinedCsv")
0 голосов
/ 26 сентября 2018

Вы можете использовать приведенный ниже код, также вы можете напрямую читать из файла gz без распаковки:

val filePath = "/home/harneet/<Dir where all gz/csv files are present>"

var cdnImpSchema = StructType(Array(
 StructField("idate",     TimestampType, true),
 StructField("time",     StringType, true),
 StructField("anyOtherColumn",  StringType, true)
))

var cdnImpDF = spark.read.format("csv").     // Use "csv" regardless of TSV or CSV.
 option("delimiter", ","). // Set delimiter to tab or comma or whatever you want.
 schema(cdnImpSchema).        // Schema that was built above.
 load(filePath)

cdnImpDF.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("mydata.csv")

repartition (1) -> Создает один файл в качестве вывода.

...