запись данных искрового раздела по метке времени - PullRequest
0 голосов
/ 27 сентября 2018

У меня есть некоторые данные, у которых есть поле столбца метки времени, которое является длинным и его стандарт эпохи, мне нужно сохранить эти данные в разделенном формате, например, гггг / мм / дд / чч, используя искровую шкалу

data.write.partitionBy("timestamp").format("orc").save("mypath") 

это просто разделение данных по отметке времени, как показано ниже

timestamp=1458444061098
timestamp=1458444061198

, но я хочу, чтобы оно было как

└── YYYY
    └── MM
        └── DD
            └── HH

Ответы [ 2 ]

0 голосов
/ 27 сентября 2018

Во-первых, я бы предостерег вас от чрезмерного разделения.То есть убедитесь, что у вас достаточно данных, чтобы их можно было разделить по часам, иначе вы можете получить множество папок с небольшими файлами.Второе предостережение, которое я хотел бы сделать, это использование иерархии разделов (год / месяц / день / час), поскольку для этого потребуется рекурсивное обнаружение разделов.

Сказав это, если вы определенно хотите разделить на сегменты часа, япредложил бы урезать вашу временную метку до часа в новый столбец и разделить ее.Тогда Spark будет достаточно умен, чтобы распознавать формат как метку времени, когда вы будете читать его обратно, и вы сможете выполнить полную фильтрацию по мере необходимости.

input
  .withColumn("ts_trunc", date_trunc("HOUR", 'timestamp)) // date_trunc added in Spark 2.3.0
  .write
  .partitionBy("ts_trunc")
  .save("/mnt/warehouse/part-test")

spark.read.load("/mnt/warehouse/part-test").where("hour(ts_trunc) = 10")

Другой вариант - разделить по дате и часудень как так:

input
  .withColumn("date", to_date('timestamp))
  .withColumn("hour", hour('timestamp))
  .write
  .partitionBy("date", "hour")
  .save("/mnt/warehouse/part-test")
0 голосов
/ 27 сентября 2018

Для этого вы можете использовать различные функции даты / времени.Сначала вы добавляете новый столбец типа даты, созданный из столбца метки времени Unix.

val withDateCol = data
.withColumn("date_col", from_unixtime(col("timestamp", "YYYYMMddHH"))

После этого вы можете добавить столбцы года, месяца, дня и часа в DF, а затем разбить их на эти новые столбцы дляthe write.

withDateCol
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.withColumn("hour", hour(col("date_col")))
.drop("date_col")
.partitionBy("year", "month", "day", "hour")
.format("orc")
.save("mypath") 

Столбцы, включенные в предложение partitionBy, не будут частью схемы файла.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...