Получение org. apache .spark.SparkException при отмене сопротивления RDD в искре - PullRequest
0 голосов
/ 23 января 2020

Я написал код spark java, в котором я кеширую один RDD, а затем отменяю его.

rowJavaRDD.cache();
rowJavaRDD.foreach(row -> {
    if(row != null && row.length() > 0) {
        try {
            if("inputData".equalsIgnoreCase(row.getString(0))) {
                monitoringMetrics.add(StreamProcessorMetrics
                        .getStreamProcessorMetricsFromRow(row));
            }
        } catch (Exception e) {
        }
    }
});

inputRDD = rowJavaRDD.filter(row -> {
    if(row != null && row.length() > 0) {
        try {
            if("MetricsCollectionRecord".equalsIgnoreCase(row.getString(0))) {
                return false;
            } else {
                return true;
            }
        } catch (Exception e) {
            return true;
        }
    }
    return true;
});

mainDF = session.createDataFrame(inputRDD, inputTask.getRecordSchema());

rowJavaRDD.unpersist();

Я получаю ошибку ниже при запуске приложения spark после долгого запуска. Я не могу полностью понять сценарий.

20/01/21 16:40:00 WARN server.TransportChannelHandler: Исключение при подключении из /10.31.29.12:40484 java .io.IOException: сброс соединения по одноранговому узлу в sun.nio.ch.FileDispatcherImpl.read0 (собственный метод) в sun.nio.ch.SocketDispatcher.read (SocketDispatcher. java: 39) в sun.nio.ch.IOUtil .readIntoNativeBuffer (IOUtil. java: 223) в sun.nio.ch.IOUtil.read (IOUtil. java: 192) в sun.nio.ch.SocketChannelImpl.read (SocketChannelImpl. java: 380) по адресу io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes (PooledUnsafeDirectByteBuf. * ​​1025 *: 221) по адресу io.netty.buffer.AbstractByteBuf.writeBytes (AbstractByteBuf. * ​​10io. .NioSocketChannel. processSelectedKey (NioEventL oop. java: 643) на io.netty.channel.nio.NioEven tL oop .processSelectedKeysOptimized (NioEventL oop. java: 566) на io.netty.channel.nio.NioEventL oop .processSelectedKeys (NioEventL oop. java: 480) на io.netty. channel.nio.NioEventL oop .run (NioEventL oop. java: 442) на io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run (SingleThreadEventExecutor. java: 131) на io.netty.util .concurrent.DefaultThreadFactory $ DefaultRunnableDecorator.run (DefaultThreadFactory. java: 144) в java .lang.Thread.run (Тема. java: 748) 20/01/21 16:40:00 ОШИБКА client.TransportResponseHandler : При закрытии соединения из /10.31.33.17:40484 по-прежнему остается 1 запрос, 20/01/21 16:40:00 WARN storage.BlockManagerMaster: Не удалось удалить RDD 574706 - Сброс соединения по пиру java .io.IOException: Соединение сбрасывается одноранговым узлом в sun.nio.ch.FileDispatcherImpl.read0 (собственный метод) в sun.nio.ch.SocketDispatcher.read (SocketDispatcher. java: 39) в sun.nio.ch.IOUtil.readIntoNativeBuffer (IOUtil. java: 223) в sun.nio.ch.IOUtil.read (IOUtil. java: 192) a t sun.nio.ch.SocketChannelImpl.read (SocketChannelImpl. java: 380) по адресу io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes (PooledUnsafeDirectByteBuf. * ​​1050 *: 22BybBybBuf.bb). . java: 899) в io.netty.channel.socket.nio.NioSocketChannel.doReadBytes (NioSocketChannel. java: 275) в io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read (AbstractNioBio). : 119) на io.netty.channel.nio.NioEventL oop .processSelectedKey (NioEventL oop. java: 643) на io.netty.channel.nio.NioEventL oop .processSelectedKeysOptimized (NioEventL oop . java: 566) на io.netty.channel.nio.NioEventL oop .processSelectedKeys (NioEventL oop. java: 480) на io.netty.channel.nio.NioEventL oop .run ( NioEventL oop. java: 442) в io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run (SingleThreadEventExecutor. java: 131) в io.netty.util.concurrent.DefaultThreadFactory $ DefaultRunnableDecoThr. java: 144) на java .lang.Thread.run (Тема. java: 748) 20.01.21 16:40:00 INFO cluster.YarnSchedulerBackend $ YarnDriverEndpoint: отключение исполнителя 38. 20/01/21 16:40:00 INFO scheduler.DAGScheduler: Исполнитель потерян: 38 ( эпоха 41051) 20.01.21 16:40:00 INFO storage.BlockManagerMasterEndpoint: Попытка удалить исполнителя 38 из BlockManagerMaster. 20/01/21 16:40:00 WARN storage.BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_574635_39! 20/01/21 16:40:00 WARN storage.BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_574635_3! 20.01.21 16:40:00 ВНИМАНИЕ ХРАНЕНИЕ. BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_574705_20! 20/01/21 16:40:00 WARN storage.BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_574635_75! 20/01/21 16:40:00 WARN storage.BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_574705_56! 20/01/21 16:40:00 WARN storage.BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_574635_21! 20/01/21 16:40:00 WARN storage.BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_574705_2! 20/01/21 16:40:00 WARN storage.BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_574635_57! 20/01/21 16:40:00 WARN storage.BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_574705_38! 20/01/21 16:40:00 WARN storage.BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_574705_74! 20/01/21 16:40:00 INFO storage.BlockManagerMasterEndpoint: удаление менеджера блоков BlockManagerId (38, hslave33017, 34543, None) 20.01.01 16:40:00 INFO storage.BlockManagerMaster: успешно удалено 38 в removeExecutor 20 / 21.01 16:40:00 INFO yarn.YarnAllocator: Завершенный контейнер container_e27_1576105203449_42801_01_000039 на хосте: hslave33017.sjc109 (состояние: ЗАВЕРШЕНО, статус выхода: -104) 20.01.01 16:40:00 Планировщик ИНФО.JobScheduduler Finish: потоковое задание 1579653540000 ms.0 из заданного времени задания 1579653540000 ms 20/01/21 16:40:00 WARN yarn.YarnAllocator: контейнер, уничтоженный YARN за превышение пределов памяти. 11,0 ГБ из 11 ГБ физической памяти. Попробуйте увеличить spark.yarn.executor.memoryOverhead. 20/01/21 16:40:00 WARN cluster.YarnSchedulerBackend $ YarnSchedulerEndpoint: Запрос драйвера на удаление исполнителя 38 по причине контейнера, уничтоженного YARN за превышение пределов памяти. 11,0 ГБ из 11 ГБ физической памяти. Попробуйте увеличить spark.yarn.executor.memoryOverhead. 20/01/21 16:40:00 INFO core.JobListener: Пакетная обработка завершена, Всего записей: 73911 20/01/21 16:40:00 INFO core.JobListener: Пакетная обработка завершена, Общая задержка: 60774 мс 20/01/21 16:40:00 ИНФОРМАЦИЯ о потоковой передаче.MetricsAccumulator: имя метрики: Streaming Metrics 20/01/21 16:40:00 ИНФОРМАЦИЯ о потоковой передаче.MetricsAccumulator: metricMap размер: 2 20/01/21 16:40:00 ИНФОРМАЦИЯ о потоковой передаче.MetricsАккумулятор: Metrics Тип: NodeStats 20/01/21 16:40:00 Потоковая информация.MetricsAccumulator: Всего записей: 73911 20.01.16 16:40:00 Потоковая информация.MetricsAccumulator: Успешные записи: 73911 20/01/21 16:40: 00 ОШИБКА cluster.YarnClusterScheduler: Потерянный исполнитель 38 на hslave33017.sjc109: Контейнер уничтожен YARN за превышение пределов памяти. 11,0 ГБ из 11 ГБ физической памяти. Рассмотрите возможность повышения spark.yarn.executor.memoryOverhead. 20/01/21 16:40:00 Потоковая информация INFO.MetricsAccumulator: сбойные записи: 0 20/01/21 16:40:00 Поток INFO streaming.MetricsAccumulator: Размер сбойных записей : 0 20/01/21 16:40:00 ИНФОРМАЦИЯ о потоковой передаче.MetricsAccumulator: Тип метрики: KAFKA 20.01.21 16:40:00 ИНФОРМАЦИЯ о потоковой передаче.MetricsAccumulator: Всего записей: 73911 20/01/21 16:40:00 INFO streaming.MetricsAccumulator: успешные записи: 73911 20/01/21 16:40:00 INFO streaming.MetricsAccumulator: сбойные записи: 0 20/01/21 16:40:00 INFO streaming.MetricsAccumulator: сбойные записи Размер: 0 20 / 21.01 16:40:00 INFO storage.BlockManagerMaster: запрошено удаление исполнителя 38 20.01.21 16:40:00 INFO cluster.YarnSchedulerBackend $ YarnDriverEndpoint: запрошено удалить несуществующего исполнителя 38 20/01/21 16: 40:00 INFO storage.BlockManagerMasterEndpoint: Попытка удалить исполнителя 38 из BlockManagerMaster. 20/01/21 16:40:00 INFO streaming.StreamingMetricsDBObject: Размер таблицы измерений: 13 20/01/21 ERROR scheduler.JobScheduler: Ошибка при выполнении задания потоковой передачи задания 1579653540000 ms.0 org. apache .spark.SparkException : Исключение, генерируемое в awaitResult: в орг. apache .spark.util.ThreadUtils $ .awaitResult (ThreadUtils. scala: 205) в орг. apache .spark.rp c .RpcTimeout.awaitResult (RpcTimeout. scala: 75) в орг. apache .spark.storage.BlockManagerMaster. removeRdd (BlockManagerMaster. scala: 126) в орг. apache .spark.SparkContext.unpersistRDD (SparkContext. scala: 1796) в орг. apache .spark.rdd.RDD.unpersist (RDD . scala: 216) в орг. apache .spark.api. java .JavaRDD.unpersist (JavaRDD. scala: 53) в com.example.bigdata.streamnow.core.StreamingProcessor $ 2.call ( StreamingProcessor. java: 192) в com.example.bigdata.streamnow.core.StreamingProcessor $ 2.call (StreamingProcessor. java: 144) в орг. apache .spark.streaming.api. java .JavaDStreamLike $$ anonfun $ foreachRDD $ 1.apply (JavaDStreamLike. scala: 272) в орг. apache .spark.streaming.api. java .JavaDStreamLike $$ anonfun $ foreachRDD $ 1.apply (JavaDStreamLike. scala: 272) в орг. apache .spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3.apply (DStream. scala: 628) в орг. apache. spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3.apply (DStream. scala: 628) в орг. apache .spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ применять $ мкВ $ зр $ 1.Apply $ мкВ $ зр (ForEac hDStream. scala: 51) в орг. apache .spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ForEachDStream. scala: 51) в орг. apache .spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ForEachDStream. scala: 51) в орг. apache .spark.streaming.dstream.DStream .createRDDWithLocalProperties (DStream. scala: 416) в орг. apache .spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply $ mcV $ sp (ForEachDStream. scala: 50) в орг. apache .spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply (ForEachDStream. scala: 50) в org. apache .spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply (ForEachDStream. scala: 50) в scala .util.Try $ .apply (Try. scala: 192) в org. apache .spark.streaming.scheduler.Job.run (Job. scala: 39) в org. apache .spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply $ mcV $ sp (JobScheduler. scala: 257) в орг. apache .spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply (JobScheduler. scala: 257) в орг. apache .spar k.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply (JobScheduler. scala: 257) в scala .util.DynamicVariable.withValue (DynamicVariable. scala: 58) в орг. apache .spark.streaming.scheduler.JobScheduler $ JobHandler.run (JobScheduler. scala: 256) в java .util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) в java .util. concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) в java .lang.Thread.run (Thread. java: 748) Причина: java .io.IOException: Сброс соединения по одноранговому узлу в sun.nio.ch.FileDispatcherImpl.read0 (собственный метод) в sun.nio.ch.SocketDispatcher.read (SocketDispatcher. java: 39) в sun.nio.ch.IOUtil.readIntoNativeBuffer (IOUtil. java: 223) в sun.nio.ch.IOUtil.read (IOUtil. java: 192) в sun.nio.ch.SocketChannelImpl.read (SocketChannelImpl. java: 380) в io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes (PooledUnsafeDirectByteBuf. * ​​1136 *: 221) на io.netty.buffer.AbstractByteBuf.writeBytes (AbstractByteBuf. * ​​1137 *: 899) на io.netty. channel.socket.nio.NioSocketChannel.doReadBytes (NioSocketChannel. java: 275) по адресу io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read (AbstractNioByteChannel. java: 119.no. NioEventL oop .processSelectedKey (NioEventL oop. java: 643) на io.netty.channel.nio.NioEventL oop .processSelectedKeysOptimized (NioEventL oop. java: 566) на io.netty. channel.nio.NioEventL oop .processSelectedKeys (NioEventL oop. java: 480) на io.netty.channel.nio.NioEventL oop .run (NioEventL oop. java: 442) в io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run (SingleThreadEventExecutor. java: 131) в io.netty.util.concurrent.DefaultThreadFactory $ DefaultRunnableDecorator.run (DefaultThreadFactory. java: 144) ... Еще 1 20/01/21 16:40:00 ОШИБКА core.StreamingProcessor: Исключительная ситуация в runProcessor org. apache .spark.SparkException: исключение, генерируемое в awaitResult: at org. apache .spark.util.ThreadUtils $. awaitResult (ThreadUtils. scala: 205) в орг. apache .spark.rp c .RpcTimeout.awaitResult (RpcTimeout. scala: 75) в орг. apache .spark.storage.BlockManagerMaster.removeRdd (BlockManagerMaster. scala: 126) в орг. apache .spark.SparkContext.unpersistRDD (SparkContext. scala: 1796) в орг. apache .spark.rdd.RDD.unpersist (RDD. scala : 216) в орг. apache .spark.api. java .JavaRDD.unpersist (JavaRDD. scala: 53) в com.example.bigdata.streamnow.core.StreamingProcessor $ 2.call (StreamingProcessor. java: 192) на com.example.bigdata.streamnow.core.StreamingProcessor $ 2.call (StreamingProcessor. java: 144) на орг. apache .spark.streaming.api. java .JavaDStreamLike $$ anonfun $ foreachRDD $ 1.apply (JavaDStreamLike. scala: 272) в org. apache .spark.streaming.api. java .JavaDStreamLike $$ anonfun $ foreachRDD $ 1.apply (JavaDStreamLike. scala: 27 2) в орг. apache .spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3.apply (DStream. scala: 628) в орг. apache. spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3.apply (DStream. scala: 628) в орг. apache .spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp (ForEachDStream. scala: 51) в org. apache .spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ForEachDStream. scala: 51) в орг. apache .spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ForEachDStream. scala: 51) в орг. apache .spark.streaming.dstream.DStream.createRDDWithLocalProperties (DStream. scala: 416) в орг. apache .spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply $ mcV $ sp (ForEachDStream. scala: 50) в орг. apache .spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply (ForEachDStream. scala: 50) в орг. apache .spark .streaming.dstream.ForEachDStream $$ anonfun $ 1.apply (ForEachDStream. scala: 50) в scala .util.Try $ .apply (Try. scala: 192) в орг. apache .spark.streaming.scheduler.Job.run (Job. scala: 39) в орг. apache. spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply $ mcV $ sp (JobScheduler. scala: 257) в орг. apache .spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ запустите $ 1.apply (JobScheduler. scala: 257) в орг. apache .spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ запустите $ 1.apply (JobScheduler. scala: 257) в scala .util.DynamicVariable.withValue (DynamicVariable. scala: 58) в орг. apache .spark.streaming.scheduler.JobScheduler $ JobHandler.run (JobScheduler. scala: 256) в java .util.concurrent .ThreadPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) в java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) в java .lang.Thread.run (поток. java: 748) Причина: java .io.IOException: Сброс соединения по одноранговому узлу на sun.nio.ch.FileDispatcherImpl.read0 (собственный метод) на sun.nio.ch.SocketDispatcher.read (SocketDispatcher. java : 39) в су n.nio.ch.IOUtil.readIntoNativeBuffer (IOUtil. java: 223) в sun.nio.ch.IOUtil.read (IOUtil. java: 192) в sun.nio.ch.SocketChannelImpl.read (SocketChannelImpl. java: 380) на io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes (PooledUnsafeDirectByteBuf. * ​​1220 *: 221) на io.netty.buffer.AbstractByteBuf.writeBytes (AbstractBytenel *.net 8: at. .socket.nio.NioSocketChannel.doReadBytes (NioSocketChannel. java: 275) на io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read (AbstractNioByteChannel. java: 119). oop .processSelectedKey (NioEventL oop. java: 643) в io.netty.channel.nio.NioEventL oop .processSelectedKeysOptimized (NioEventL oop. java: 566) в io.netty.channel .nio.NioEventL oop. processSelectedKeys (NioEventL oop. java: 480) на io.netty.channel.nio.NioEventL oop .run (NioEventL oop. java: 442) на io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run (SingleThreadEventExecutor. java: 131) на io.netty.util.concurrent.DefaultThreadFactory $ DefaultRunnableDecorator.run (DefaultThreadFactory. java: 144)

Похоже, что во время unpersist ( ) соединение вызова метода получило сброс от исполнителя, поскольку оно выходит из памяти. Из-за чего искра вызывает возбуждение.

Правильно ли мое объяснение или что-то еще происходит? И как справиться с таким сценарием? поможет ли простое добавление try catch или произойдет потеря данных?

...