У меня есть задание, которое анализирует приблизительный терабайт данных в формате json, разделенных на файлы размером 20 МБ (это происходит потому, что каждая минута получает по существу набор данных объемом 1 ГБ).
Задание анализирует, фильтрует и преобразует эти данные и записывает их обратно в другой путь. Однако, работает ли он, зависит от конфигурации искры.
Кластер состоит из 46 узлов с 96 ядрами и 768 ГБ памяти на узел. Драйвер имеет те же характеристики.
Я отправляю работу в автономном режиме и:
- При использовании 22g и 3 ядер на каждого исполнителя работа не выполняется из-за gc и OOM
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/opt/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError19/04/13 01:35:32 WARN TransportChannelHandler: Exception in connection from /10.0.118.151:34014
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.sun.security.sasl.digest.DigestMD5Base$DigestIntegrity.getHMAC(DigestMD5Base.java:1060)
at com.sun.security.sasl.digest.DigestMD5Base$DigestPrivacy.unwrap(DigestMD5Base.java:1470)
at com.sun.security.sasl.digest.DigestMD5Base.unwrap(DigestMD5Base.java:213)
at org.apache.spark.network.sasl.SparkSaslServer.unwrap(SparkSaslServer.java:150)
at org.apache.spark.network.sasl.SaslEncryption$DecryptionHandler.decode(SaslEncryption.java:126)
at org.apache.spark.network.sasl.SaslEncryption$DecryptionHandler.decode(SaslEncryption.java:101)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
: An error occurred while calling o54.json.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
- При использовании 120 г и 15 ядер на каждого исполнителя задание выполняется успешно.
Почему задание не будет выполнено при настройке памяти и ядра меньшего размера?
Примечания:
Существует операция разнесения, которая также может быть связана. Изменить: не связано. Протестировал код, выполнил простой spark.read.json (). Count (). Show () и он gc'd и OOM'd.
Моя текущая теория домашних животных на данный момент заключается в том, что большое количество маленьких файлов приводит к высоким накладным расходам в случайном порядке. Это то, что происходит, и есть ли способ обойти это (за исключением повторной агрегации файлов отдельно)?
Код по запросу:
Launcher
./bin/spark-submit --master spark://0.0.0.0:7077 \
--conf "spark.executor.memory=90g" \
--conf "spark.executor.cores=12" \
--conf 'spark.default.parallelism=7200' \
--conf 'spark.sql.shuffle.partitions=380' \
--conf "spark.network.timeout=900s" \
--conf "spark.driver.extraClassPath=$LIB_JARS" \
--conf "spark.executor.extraClassPath=$LIB_JARS" \
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
launcher.py
код
spark = SparkSession.builder \
.appName('Rewrites by Frequency') \
.getOrCreate()
spark.read.json("s3://path/to/file").count()