Spark sql не может перезаписать большие паркетные перегородки - PullRequest
1 голос
/ 16 марта 2019

Я перезаписываю существующий паркетный раздел apache spark 2.2 каждый раз, когда появляется запрос на обновление всего раздела. Данные искажены. Так что некоторые разделы огромны. Изначально я не перераспределяю фрейм данных (созданный из исходных данных) и пытаюсь записать его с помощью следующей psuedo spark sql.

insert overwrite table mytable partition(a=123, b=234) select c1, c2, c3 from mydataframe where a=123 and b=234

(я мог бы использовать обновление sql для динамического разбиения, но это ничего не меняет в этой проблеме.)

Теперь, поскольку в этом разделе хранятся огромные данные (более 5 г), я получаю java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE Кажется, это указывает на то, что размер блока превысил 2-граммовый предел искры. Общее решение, предлагаемое для этого, состоит в том, чтобы увеличить количество разделов вашего фрейма данных.

Если я увеличу количество разделов в кадре данных, то получу

CommitDeniedException: attempt Not committed because the driver did not authorize commit

и

TaskCommitDenied (Driver denied task commit) for job: 124, partition: 39, attemptNumber: 0

Ниже приведен порядок исключения, который я вижу в большинстве журналов исполнителя. Произошел сбой Shuffle, затем CommitDeniedException, а затем OutOfMemory к концу, где он умирает

19/03/15 12:42:52 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches

19/03/15 13:10:22 ERROR util.Utils: Aborting task
org.apache.spark.executor.CommitDeniedException: attempt_20190315131022_0085_m_000042_360: Not committed because the driver did not authorize commit
        at org.apache.spark.mapred.SparkH

19/03/15 13:34:48 ERROR util.Utils: Uncaught exception in thread driver-heartbeater
java.lang.OutOfMemoryError: Java heap space

Я не могу понять, почему я получаю TaskCommitDenied. spark.speculation по умолчанию отключено. Я думаю, что перераспределение данных может быть причиной этой проблемы. Поправьте меня, если я ошибаюсь, но - поскольку я перезаписываю данные в разделы phsycial parquet, число разделов в кадре данных также должно быть одинаковым, в противном случае несколько задач, вероятно, попытаются перезаписать его одновременно. Если ваш фрейм данных имеет больше разделов, чем физический раздел, то это может произойти.

так как мне переписать огромный паркетный раздел с помощью spark-sql?

Дополнительная Stacktrace (фрагмент от одного исполнителя, это происходит на нескольких узлах):

19/03/16 15:39:08 INFO codegen.CodeGenerator: Code generated in 10.443414 ms
19/03/16 15:39:08 INFO codegen.CodeGenerator: Code generated in 29.516827 ms
19/03/16 15:39:08 INFO codegen.CodeGenerator: Code generated in 51.646654 ms
19/03/16 15:39:08 INFO parquet.MapredParquetOutputFormat: creating new record writer...org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat@4c0e1ce4
19/03/16 15:39:08 INFO write.ParquetRecordWriterWrapper: initialize serde with table properties.
19/03/16 15:39:08 INFO write.ParquetRecordWriterWrapper: creating real writer to write at maprfs:/mdsMyProject/hive/warehouse/MyProjectint4242.db/table_12341factTable/.hive-staging_hive_2019-03-16_14-57-17_337_1173214459531843754-3/-ext-10000/_temporary/0/_temporary/attempt_20190316153908_0020_m_000121_0/business_id=1224/period_id=684039130/part-00121-b2416c44-6e2c-4f27-993e-0328fd64ea2e.c000
19/03/16 15:39:09 INFO write.ParquetRecordWriterWrapper: real writer: parquet.hadoop.ParquetRecordWriter@5659d6bb
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
19/03/16 15:42:36 INFO mapred.SparkHadoopMapRedUtil: attempt_20190316153908_0020_m_000121_0: Not committed because the driver did not authorize commit
19/03/16 15:42:36 ERROR util.Utils: Aborting task
org.apache.spark.executor.CommitDeniedException: attempt_20190316153908_0020_m_000121_0: Not committed because the driver did not authorize commit
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:83)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:171)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:261)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:262)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
19/03/16 15:42:36 WARN output.FileOutputCommitter: Could not delete maprfs:/mdsMyProject/hive/warehouse/MyProjectint4242.db/table_12341factTable/.hive-staging_hive_2019-03-16_14-57-17_337_1173214459531843754-3/-ext-10000/_temporary/0/_temporary/attempt_20190316153908_0020_m_000121_0
19/03/16 15:42:36 ERROR datasources.FileFormatWriter: Job job_20190316153908_0020 aborted.
19/03/16 15:52:03 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1312
19/03/16 15:52:03 INFO executor.Executor: Running task 7.0 in stage 19.4 (TID 1312)
19/03/16 15:52:03 INFO spark.MapOutputTrackerWorker: Updating epoch to 42 and clearing cache
19/03/16 15:52:03 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 40
19/03/16 15:52:03 INFO client.TransportClientFactory: Successfully created connection to /10.250.70.17:33652 after 2 ms (0 ms spent in bootstraps)
19/03/16 15:52:03 INFO memory.MemoryStore: Block broadcast_40_piece0 stored as bytes in memory (estimated size 17.1 KB, free 11.0 GB)







19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 2, fetching them
19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.250.70.17:32803)
19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 2, fetching them
19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Got the output locations
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 32 blocks
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 32 blocks
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 9 ms
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Started 1 remote fetches in 14 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 153.597538 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 21.508826 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 13.097823 ms
19/03/16 16:05:18 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:18 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/03/16 16:05:18 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:19 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:19 INFO storage.DiskBlockManager: Shutdown hook called
19/03/16 16:05:20 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:20 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:21 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:22 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:23 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:23 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:24 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:25 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:25 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:26 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:27 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:28 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:28 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:28 INFO util.ShutdownHookManager: Shutdown hook called
19/03/16 16:05:29 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:30 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:30 INFO util.ShutdownHookManager: Deleting directory /tmp/hadoop-myCompany/nm-local-dir/usercache/myCompany/appcache/application_1552538107854_5893/spark-fc8ff151-20b7-46b2-b1d6-7cc047f1b495
19/03/16 16:05:31 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
^@^Fstdout^@^C124#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 18350"...





Sometime I see more stack trace when OOM happens


19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 19.177788 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 12.840521 ms
19/03/16 16:05:15 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/03/16 16:05:15 ERROR executor.Executor: Exception in task 10.0 in stage 20.2 (TID 1331)
java.lang.OutOfMemoryError: Java heap space
        at java.io.FileInputStream.close(FileInputStream.java:326)
        at java.io.FilterInputStream.close(FilterInputStream.java:181)
        at org.apache.spark.network.util.LimitedInputStream.close(LimitedInputStream.java:125)
        at java.io.FilterInputStream.close(FilterInputStream.java:181)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:438)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
19/03/16 16:05:15 INFO storage.DiskBlockManager: Shutdown hook called
19/03/16 16:05:15 ERROR executor.Executor: Exception in task 24.0 in stage 20.2 (TID 1346)
19/03/16 16:05:15 INFO storage.DiskBlockManager: Shutdown hook called
19/03/16 16:05:15 ERROR executor.Executor: Exception in task 24.0 in stage 20.2 (TID 1346)
java.lang.OutOfMemoryError: Java heap space
        at java.lang.Integer.valueOf(Integer.java:832)
        at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:421)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
19/03/16 16:05:15 INFO executor.Executor: Not reporting error to driver during JVM shutdown.
19/03/16 16:05:15 INFO executor.Executor: Not reporting error to driver during JVM shutdown.
19/03/16 16:05:15 ERROR util.SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 1331,5,main]
19/03/16 16:05:15 ERROR util.SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 1346,5,main]
java.lang.OutOfMemoryError: Java heap space
        at java.lang.Integer.valueOf(Integer.java:832)
        at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:421)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
19/03/16 16:05:16 INFO util.ShutdownHookManager: Shutdown hook called
19/03/16 16:05:16 INFO util.ShutdownHookManager: Deleting directory /tmp/hadoop-myCompany/nm-local-dir/usercache/myCompany/appcache/application_1552538107854_5893/spark-0d69a896-fbdf-4627-bc0b-7384c9421fa0
^@^Fstdout^@^C122#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 930"...
...