spark sql не может запросить паркетные перегородки в S3 - PullRequest
0 голосов
/ 20 апреля 2020

У меня есть 100 файлов parquet_dir / *. Snappy.parquet как разделы в AWS s3. Размер файла составляет 6 ГБ. Я не мог запросить эти разделенные файлы. Где тот же запрос был успешным при чтении тех же файлов разделов, записанных в HDFS Пожалуйста, подскажите мне, как бороться с этой проблемой.

val DF = spark.read.parquet("s3a:/parquet_dir").cache() 
DF.registerTempTable("DF1")
val query1=sqlContext.sql("select * from DF1").show

Сообщение об ошибке

(0 + 24) / 25]2020-04-21 01:08:41,352 WARN storage.BlockManager: Putting block rdd_7_4 failed due to exception java.io.InterruptedIOException: Failed to open s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet at 4 on s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool.
2020-04-21 01:08:41,353 WARN storage.BlockManager: Block rdd_7_4 could not be removed as it was not found on disk or in memory
2020-04-21 01:08:41,359 ERROR executor.Executor: Exception in task 4.0 in stage 2.0 (TID 128)
java.io.InterruptedIOException: Failed to open s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet at 4 on s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:182)
    at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:328)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
    at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:321)
    at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:433)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:102)
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127)
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
    at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:125)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Ответы [ 2 ]

0 голосов
/ 23 апреля 2020

Вы достигли предела для пула потоков, а некоторые другие операции выполняются так долго, что время ожидания потока истекает.

Установите параметр spark.had oop .fs.s3a.connection. максимум чего-то большего, чем значение по умолчанию (48).

0 голосов
/ 21 апреля 2020

Нажмите здесь ! Пробовал объединение dataframe, который читает разделенные файлы, и это работало

...