Потеряю ли я данные при удалении поврежденного файла паркета, записанного потоковой структурой spark? - PullRequest
0 голосов
/ 25 мая 2019

Я использую spark-структурированную потоковую передачу в качестве потребителя для получения данных от kafka, следуя инструкциям, обратитесь к https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Затем сохраните данные в формате hdfs в виде файла паркета.

ЗдесьМой вопрос: программа работает хорошо, но некоторые контейнеры редко выходят из строя (но это не помогло), что приводит к повреждению паркетных файлов.это приведет к ошибке, как [это не файл Parquet (слишком маленькая длина: 4)] или [.parquet не является файлом Parquet.ожидаемое магическое число в хвосте [80, 65, 82, 49], но найдено [56, 52, 53, 51]] при чтении их.Я должен переместить их в другие каталоги и убедиться, что запрос из улья работает хорошо.Но я не уверен, приведет ли это к потере данных из-за перемещения.

Я знаю, что для восстановления используется контрольная точка с искровой структурой, но поскольку некоторые данные записаны как паркет, я не уверен, что смещениепомечено как совершенное.

1 Ответ

0 голосов
/ 29 мая 2019

Я выполнил очень простое упражнение по загрузке txt-файла в каталог файлов, который читается структурированной потоковой передачей Spark. Писатель структурированного потока писал в файл паркета. После загрузки двух файлов я вижу, что метаданные, сгенерированные в spark, упоминают оба файла. Таким образом, если вы удалите один из них (включая файл метаданных, созданный с помощью приемника файлов), чтение файла паркета завершится с ошибкой из HDFS за исключением (файл не найден).

scala> val ParquetDF1 = spark.read.parquet("/user/root/sink2")
19/05/29 09:57:27 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 13.0 (TID 19, quickstart.cloudera, executor 2): org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
        at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:290)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:537)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:610)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:602)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/sink2/part-00000-454836ef-f7bc-444e-9a6b-e81e640a196d-c000.snappy.parquet
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2092)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2062)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1975)

Единственное отличие здесь - вы используете Hive, а я непосредственно создаю фрейм данных Parquet из HDFS.

...