В pyspark, как разделитьBy части значения определенного столбца при использовании df.write.partitionBy (..). Save? - PullRequest
2 голосов
/ 25 июня 2019

Я хочу разбить дату вместо моего времени в моем Spark Dataframe, как я могу это сделать?

Предположим, у меня есть дата-кадр с первым столбцом в качестве даты и времени, например '2019-06-25 12:00:00 ',' 2019-06-25 11:00:00 'и т. Д. Я знаю, как разделить его по времени, но не знаю, как разделить его по дате.

spark = SparkSession.builder.getOrCreate()

df = spark.sparkContext.parallelize([
    Row("2019-06-25 12:00:00", "2"), Row("2019-06-25 11:00:00", "a"),
    Row("2019-06-24 02:03:10", "2"), Row("2019-06-22 08:00:00", "b"),
    Row("2019-03-12 08:01:34", "3")]).toDF(["datetime", "val"])
+-------------------+---+
|               date|val|
+-------------------+---+
|2019-06-25 12:00:00|  2|
|2019-06-25 11:00:00|  a|
|2019-06-24 02:03:10|  2|
|2019-06-22 08:00:00|  b|
|2019-03-12 08:01:34|  3|
+-------------------+---+

Я хочу использовать такие методы, как

df.write.partitionBy(substr('datetime', 10)).save(path='...', mode='...')

, чтобы достичь своей цели, но, очевидно, вышеописанное не сработает.

1 Ответ

2 голосов
/ 25 июня 2019

В такой ситуации вы можете просто добавить новый столбец на основе вашего поля «datetime», скажем, «date_only»

Фрагмент кода будет выглядеть как

1) Зарегистрироватьсяваш Dataframe из исходного SQL или любой плоской файловой системы и т. д. В этом случае давайте рассмотрим следующую последовательность:

df = spark.sparkContext.parallelize([
    ("2019-06-25 12:00:00", "2"), ("2019-06-25 11:00:00", "a"),
    ("2019-06-24 02:03:10", "2"), ("2019-06-22 08:00:00", "b"),
    ("2019-03-12 08:01:34", "3")]).toDF(["datetime", "val"])

2) Подготовка нового Dataframe из исходного, это позволит вам получить новыйстолбец и существующий столбец не будет удален из ваших результирующих файлов в разделах.

from pyspark.sql import functions as func
partitioned_df = df.withColumn("date_only", func.to_date(func.col("datetime")))

3) Сохранение данных в режиме добавления в эти разделы.

partitioned_df.write.partitionBy('date_only').save(path='dbfs:/FileStore/tables/Temp', mode='append')

4)Я попробовал следующее в Databricks, и таксономия выглядит следующим образом:

Databricks File System - LS result

5) Также файл Parquet (Snappy) содержит такой результат:

Sample file Partitioned in Parquet format

Пожалуйста, дайте мне знать, если это решит вашу проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...