Datapro c не может разархивировать файл .gz, заархивированный AWS Kinesis - PullRequest
1 голос
/ 20 января 2020

Моя компания пытается перевести службы с AWS на GCP. Мы сталкиваемся с некоторыми проблемами. Данные, собранные AWS Kinesis, представляют собой .gz файлы. Мы используем GCP Cloud Storage для передачи этих файлов на платформу GCP и используем Datapro c для обработки этих данных. Все эти данные могут быть правильно обработаны в AWS, но не могут быть правильно прочитаны тем же заданием Spark.

См. Сгенерированное исключение в конце.

Я попытался разархивировать один из этих файлов как ABC.gz в GCP Cloud Shell. Распакованный файл по-прежнему заканчивается на .gz: ABC.gz. Я думаю, что это причина root, поскольку Spark может попытаться разархивировать разархивированный файл.

Если мы переименуем эти файлы, удалив суффиксы .gz, Spark сможет работать нормально. Однако процесс переименования занимает слишком много времени и занимает более нескольких часов для обработки данных за один день.

Любые предложения приветствуются. Заранее спасибо.

Caused by: java.io.IOException: incorrect header check
  at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
  at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
  at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
  at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
  at java.io.InputStream.read(InputStream.java:101)
  at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
  at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
  at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
  at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:151)
  at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:191)
  at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
  at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:190)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:6
31)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

1 Ответ

1 голос
/ 30 января 2020

Без дополнительных подробностей трудно сказать, что именно происходит, но, скорее всего, вы храните .gz файлы без сжатия или с использованием декомпрессивного транскодирования GCS . Это означает, что файлы, прочитанные Spark, уже распакованы (они не были сжаты в первую очередь или были распакованы клиентской библиотекой HTTP, если используется декомпрессивное транскодирование GCS), что приводит к сбою, поскольку Hadoop / Spark автоматически попытается распаковать файлы с .gz тоже расширение.

Если выше верно, похоже, у вас нет другого выбора, кроме как переименовать эти файлы, чтобы удалить расширение .gz. Также обратите внимание, что обработка сжатых Gzip файлов неэффективна в Spark / Had oop, поскольку они не разделяются.

...