Я пытаюсь запустить искровую работу с PySpark через ноутбук Jupyter, работающий в Docker. Рабочие расположены на отдельных машинах в одной сети. Я выполняю операцию take
на СДР:
data.take(number_of_elements)
Когда number_of_elements
равен 2000, все работает нормально. Когда это 20000, исключение происходит. С моей точки зрения это ломается, когда размер результата превышает 2ГБ (или мне так кажется). Идея о 2 ГБ исходит из того, что искра может отправлять результаты размером менее 2 ГБ в одном блоке, а когда результат больше 2 ГБ, начинает работать другой механизм, и что-то там ломается ( см. Здесь ). Вот исключение из журнала исполнителя:
19/11/05 10:27:14 INFO CodeGenerator: Code generated in 205.7623 ms
19/11/05 10:27:40 INFO PythonRunner: Times: total = 25421, boot = 3, init = 1751, finish = 23667
19/11/05 10:27:42 INFO MemoryStore: Block taskresult_4 stored as bytes in memory (estimated size 927.7 MB, free 6.4 GB)
19/11/05 10:27:42 INFO Executor: Finished task 0.0 in stage 3.0 (TID 4). 972788748 bytes result sent via BlockManager)
19/11/05 10:27:49 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1585998572000, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@4399ad49} to /10.0.0.9:56222; closing connection
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.spark.util.io.ChunkedByteBufferFileRegion.transferTo(ChunkedByteBufferFileRegion.scala:64)
at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121)
at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
Как мы видим из журнала, исполнитель пытается отправить результат на 10.0.0.9:56222
. Это терпит неудачу, потому что порт не открыт в docker compose. 10.0.0.9
- это IP-адрес главного узла, но порт 56222
является случайным, хотя я явно настроил все порты, которые можно найти в документации , чтобы отключить произвольный выбор порта:
spark = SparkSession.builder\
.master('spark://spark.cyber.com:7077')\
.appName('My App')\
.config('spark.task.maxFailures', '16')\
.config('spark.driver.port', '20002')\
.config('spark.driver.host', 'spark.cyber.com')\
.config('spark.driver.bindAddress', '0.0.0.0')\
.config('spark.blockManager.port', '6060')\
.config('spark.driver.blockManager.port', '6060')\
.config('spark.shuffle.service.port', '7070')\
.config('spark.driver.maxResultSize', '14g')\
.getOrCreate()
Я сопоставил эти порты с помощью docker compose:
version: "3"
services:
jupyter:
image: jupyter/pyspark-notebook:latest
ports:
- "4040-4050:4040-4050"
- "6060:6060"
- "7070:7070"
- "8888:8888"
- "20000-20010:20000-20010"