Сохранить Spark DataFrame в HDFS с разбивкой по дате - PullRequest
0 голосов
/ 02 июня 2019

Мне нужно записать данные из фрейма данных Spark в HDFS в формате Avro. Проблема заключается в том, что данные должны сохраняться каждый день, чтобы каталоги выглядели так: tablename / 2019-08-12, tablename / 2019-08-13 и так далее. У меня есть только поле метки времени, из которого мне нужно извлечь дату для создания имен каталогов. Я построил подход, который имеет 2 проблемы: 1) Есть трудности с извлечением даты из отметки времени 3) На большом наборе данных (и он будет больше позже) производительность будет очень плохой, так как запускается много задач. Итак, как я могу изменить / улучшить этот подход?

Вот код, который я использовал (dataDF - входные данные):

val uniqueDates = dataDF.select("update_database_time").distinct.
collect.map(elem => elem.getTimestamp(0).getDate)

    uniqueDates.map(date => {
      val resultDF = dataDF.where(to_date(dataDF.col("update_database_time")) <=> date)
      val pathToSave = s"${dataDir}/${tableNameValue}/${date}"
      dataDF.write
            .format("avro")
            .option("avroSchema", SchemaRegistry.getSchema(
                   schemaRegistryConfig.url,
                   schemaRegistryConfig.dataSchemaSubject,
                   schemaRegistryConfig.dataSchemaVersion))
            .save(s"${hdfsURL}${pathToSave}")
      resultDF
    })
      .reduce(_.union(_))

1 Ответ

1 голос
/ 03 июня 2019

Если вы можете жить с такой структурой каталогов, как

tablename/date=2019-08-12
tablename/date=2019-08-13

вместо этого, тогда DataFrameWriter.partitionBy добивается цели. Например

val df =
  Seq((Timestamp.valueOf("2019-06-01 12:00:00"), 1),
      (Timestamp.valueOf("2019-06-01 12:00:01"), 2),
      (Timestamp.valueOf("2019-06-02 12:00:00"), 3)).toDF("time", "foo")

df.withColumn("date", to_date($"time"))
  .write
  .partitionBy("date")
  .format("avro")
  .save("/tmp/foo")

дает следующую структуру

find /tmp/foo
/tmp/foo
/tmp/foo/._SUCCESS.crc
/tmp/foo/date=2019-06-01
/tmp/foo/date=2019-06-01/.part-00000-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc
/tmp/foo/date=2019-06-01/part-00000-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/date=2019-06-01/.part-00001-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc
/tmp/foo/date=2019-06-01/part-00001-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/_SUCCESS
/tmp/foo/date=2019-06-02
/tmp/foo/date=2019-06-02/part-00002-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/date=2019-06-02/.part-00002-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc
...