Объединить несколько файлов JSON в один файл JSON и файл паркета - PullRequest
1 голос
/ 01 мая 2020

Местоположение источника S3 с сотнями JSON

  1. Все JSON файлы должны быть объединены в один JSON файл. т.е. не part-0000... файлы
  2. Выходной файл JSON должен заменить все эти файлы в исходном местоположении S3
  3. Тот же файл JSON должен быть преобразован в паркет и сохранен в другом месте S3

Есть ли лучший вариант, кроме ниже,

  1. Загрузите файл JSON в Dataframe
  2. Сохраните его на локальном диске
  3. Загрузка объединенного файла JSON в S3
  4. Очистка остальных файлов S3 после успешной загрузки объединенного файла S3 с использованием AWS API клиента SDK
  5. Это выполняется параллельно с 4 ... сохранить файл паркета в паркет S3 с помощью API-фреймов данных

У меня возник вопрос ниже по поводу дизайна выше

  • Есть ли более надежный способ сделать это?
  • Могу ли я читать и писать в том же месте S3 и пропустить шаг №. 2.

Ответы [ 2 ]

1 голос
/ 01 мая 2020

да, можно пропустить # 2. Запись в то же место может быть сделано с SaveMode.Overwrite в том же месте, откуда вы читаете.

при первом чтении json, т. Е. № 1 в качестве информационного кадра, оно будет в памяти, если вы выполните кеширование. После этого вы можете выполнить очистку и объединить все json в единое целое с объединением и сохранить в файле паркета за один шаг. что-то вроде этого примера.
Случай 1: все jsons находятся в разных папках, и вы хотите, чтобы они сохраняли окончательный кадр данных как паркет в том же месте, где есть jsons ...

val dfpath1 = spark.read.json("path1")
val dfpath2 =  spark.read.json("path2")
val dfpath3 =  spark.read.json("path3")

val df1 = cleanup1 function dfpath1 returns dataframe
val df2 = cleanup2 function dfpath2 returns dataframe
val df3 = cleanup3 function dfpath3 returns dataframe

val dfs = Seq(df1, df2, df3)
val finaldf = dfs.reduce(_ union _) // you should have same schema while doing union..


  finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with samelocations json.parquet")

Случай 2 : все файлы jsons находятся в одних и тех же папках, и вы хотите, чтобы они сохраняли окончательный кадр данных в виде нескольких паркетов в одном root месте, где есть jsons ...

В этом случае нет необходимости читать как несколько фреймов данных, вы можете указать root путь, где есть jsons с той же схемой

val dfpath1 = spark.read.json("rootpathofyourjsons with same schema")

// or you can give multiple paths spark.read.json("path1","path2","path3")
 // since it s supported by spark dataframe reader like this ...def json(paths: String*):
val finaldf = cleanup1 function returns  dataframe
finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with sameroot locations json.parquet")

AFAIK, в любом случае aws s3 sdk api is больше не требуется.

ОБНОВЛЕНИЕ: Рег. Файл не найден Исключение, с которым вы столкнулись ... см. Пример кода, как это сделать. Я процитировал тот же пример, который вы мне показали здесь

import org.apache.spark.sql.functions._
  val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")

  df.show(false)

  df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
  val df1 = spark.read.format("parquet").load(".../temp") // read back again

 val df2 = df1.withColumn("cleanup" , lit("Quick silver want to cleanup")) // like you said you want to clean it.

  //BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.

  df2.cache // cache to avoid FileNotFoundException
  df2.show(2, false) // light action to avoid FileNotFoundException
   // or println(df2.count) // action

   df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
  println("quick silver saved in same directory where he read it from final records he saved after clean up are  ")
  df2.show(false)

Результат:

+---+----+
|sex|date|
+---+----+
|1  |10  |
|2  |20  |
|3  |30  |
+---+----+

+---+----+----------------------------+
|sex|date|cleanup                     |
+---+----+----------------------------+
|1  |10  |Quick silver want to cleanup|
|2  |20  |Quick silver want to cleanup|
+---+----+----------------------------+
only showing top 2 rows

quick silver saved in same directory where he read it from final records he saved after clean up are  
+---+----+----------------------------+
|sex|date|cleanup                     |
+---+----+----------------------------+
|1  |10  |Quick silver want to cleanup|
|2  |20  |Quick silver want to cleanup|
|3  |30  |Quick silver want to cleanup|
+---+----+----------------------------+


Снимок экрана: файл сохранен, очистка при повторном чтении очищена и снова сохранена:

enter image description here

Примечание. Необходимо реализовать кейс 1 или кейс 2 , как предложенное обновление выше ...

0 голосов
/ 05 мая 2020
spark.read
                  .json(sourcePath)
                  .coalesce(1)
                  .write
                  .mode(SaveMode.Overwrite)
                  .json(tempTarget1)

                val fs = FileSystem.get(new URI(s"s3a://$bucketName"), sc.hadoopConfiguration)

                val deleted = fs
                  .delete(new Path(sourcePath + File.separator), true)
                logger.info(s"S3 folder path deleted=${deleted} sparkUuid=$sparkUuid path=${sourcePath}")

                val renamed = fs
                  .rename(new Path(tempTarget1),new Path(sourcePath))

Попробовал и не смог,

  1. Кэширование / сохранение данных на фрейме не работало, так как всякий раз, когда я пытался записать, cachedDf.write возвращался, чтобы проверить файл S3, который я ранее вручную чистил write.
  2. Запись Dataframe напрямую в тот же каталог S3 не работает, так как Dataframe переопределяет только файл, который разбит на разделы, т.е. файл, начинающийся с 'part-00 ...'.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...