Моя компания пытается перевести службы с AWS на GCP. Мы сталкиваемся с некоторыми проблемами. Данные, собранные AWS Kinesis, представляют собой .gz
файлы. Мы используем GCP Cloud Storage для передачи этих файлов на платформу GCP и используем Datapro c для обработки этих данных. Все эти данные могут быть правильно обработаны в AWS, но не могут быть правильно прочитаны тем же заданием Spark.
См. Сгенерированное исключение в конце.
Я попытался разархивировать один из этих файлов как ABC.gz
в GCP Cloud Shell. Распакованный файл по-прежнему заканчивается на .gz
: ABC.gz
. Я думаю, что это причина root, поскольку Spark может попытаться разархивировать разархивированный файл.
Если мы переименуем эти файлы, удалив суффиксы .gz
, Spark сможет работать нормально. Однако процесс переименования занимает слишком много времени и занимает более нескольких часов для обработки данных за один день.
Любые предложения приветствуются. Заранее спасибо.
Caused by: java.io.IOException: incorrect header check
at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:151)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:191)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:190)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:6
31)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)