Возможен ли pyspark для чтения из таблицы в S3, обработки данных и сохранения в той же папке? - PullRequest
0 голосов
/ 17 декабря 2018

я хочу объединить некоторые данные в папке на s3 и сохранить данные (объединенные) в том же каталоге.Возможно ли это?

Я уже пробовал:

DF1.write.mode("overwrite").format("parquet").partitionBy("month").save("s3://path/db/table/")

Но кажется, что каталог очищается, прежде чем его можно прочитать и объединить.

Большое спасибо.

Ответы [ 2 ]

0 голосов
/ 19 декабря 2018

Можно, но вам придется кэшировать исходный кадр данных перед перезаписью, иначе отложенная загрузка данных из паркета вызовет проблемы при записи в то же место.

Простой пример

val sourceFile = "/tmp/testoverwrite/A"

val init = List(("A", 1), ("B", 1), ("C", 1)).toDF("X", "count")
init.write.mode(SaveMode.Overwrite).parquet(sourceFile)

val rand = Random

(0 to 3).foreach{_ =>
    val A = spark.read.parquet(sourceFile).cache()
    val _ = A.count() // Trigger cache

    val B = (0 to 4).map(_ =>((rand.nextInt(10) + 65).toChar.toString, 1)).toDF("X", "count")
    A.union(B).groupBy('X).agg(sum('count).as("count"))
      .write.mode(SaveMode.Overwrite).parquet(sourceFile)

    A.unpersist(true)
}

val A = spark.read.parquet(sourceFile).show()
0 голосов
/ 17 декабря 2018

Существуют различные режимы, которые можно использовать для сохранения данных, такие как добавление: добавление данных, перезапись: перезапись данных и т. Д.

Вы можете найти больше информации о режимах здесь для pyspark:

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.save

Однако, если вышеприведенная информация не полезна, тогда, если вы также можете уточнить немного, чтоВы имеете в виду под «консолидированными» данными, может помочь какой-то пример.

С уважением,

Neeraj

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...