Как удалить старые данные, созданные Spark Structured Streaming? - PullRequest
0 голосов
/ 20 марта 2020

как я могу удалить старые данные, созданные Spark Structured Streaming (Spark 2.4.5)?

У меня есть данные на HDFS в формате паркет / авро ( не Delta ), то есть создается Spark Structured Streaming и разбивается по времени (год, месяц, день месяца, час).

Данные создаются следующим образом:

query = df.writeStream.format("avro").partitionBy("year", "month", "day", "hour").outputMode("append").option("checkpointLocation", "/data/avro.cp").start("/data/avro")

As В результате у меня следующая структура папок разделов:

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
./year=2020/month=3/day=13/hour=14
./year=2020/month=3/day=13/hour=15
./year=2020/month=3/day=13/hour=16

Как удалить старые данные, например, старше года = 2020, месяца = 2, дня = 13, часа = 14?

Простое удаление соответствующих папок

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13

вызывает исключение при чтении пакета данных из файловой системы:

df = spark.read.format("avro").load("/data/avro")
java.io.FileNotFoundException: File file:/data/avro/year=2020/month=3/day=13/hour=12/part-00000-0cc84e65-3f49-4686-85e3-1ecf48952794.c000.avro does not exist

Как я выяснил, это как-то связано с папкой _spark_metadata, которая используется контрольно-пропускными пунктами.

Благодарим Вас за помощь.

Ответы [ 2 ]

0 голосов
/ 22 марта 2020

Кажется, я нашел решение / обходной путь. Ключевой концепцией является использование FileStreamSinkLog и обновление его с помощью SinkFileStatus с действием, установленным на delete:

  1. load FileStreamSinkLog

    sinkLog = new FileStreamSinkLog(1, spark, full-path-to-spark-metadata-dir);
    
  2. get последний SinkFileStatus

    Option<Tuple2<Object, SinkFileStatus[]>> latest = sinkLog.getLatest();
    long batchId = (long)latest.get()._1;
    SinkFileStatus[] fileStatuses = latest.get()._2;
    
  3. удалить старые файлы

  4. Добавить новую запись с действием delete в fileStatuses массив

  5. запись batchId файл журнала обратно с обновленным fileStatuses

0 голосов
/ 20 марта 2020

Вы не можете удалить эту папку, если не удалите также соответствующие папки контрольных точек. Вы пытаетесь удалить папку, пока контрольная точка все еще знает об этом, поэтому возникает ошибка.

Однако я бы не советовал возиться с папкой контрольных точек без необходимости. Если это возможно в вашей ситуации, я бы посоветовал вместо этого перенести ваши старые данные в другие типы хранилищ данных, такие как AWS Стандарт -> Ледник.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...