Не удается загрузить данные .csv в локальном соединении Spark при использовании sparklyr - PullRequest
0 голосов
/ 29 января 2019

Справочная информация : я совершенно новичок во всей платформе и концепции Spark и пытаюсь научиться работать с ней через R с sparklyr.Я начал посещать онлайн-курс по этой теме и пытаюсь использовать его для собственного анализа данных в качестве способа изучения.

Проблема : я пытаюсь загрузить 6,3 ГБнабор данных CSV (~ 30 мил строк, ~ 20 столбцов), но я получаю следующую ошибку (из того, что я мог сказать, одни и те же куски повторялись, я даю здесь первые 3 из них, так как в противном случае я достигну предела символов дляпост).Код выполняется, но через 17 минут он завершается со следующей ошибкой (данные не загружены):

Error: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
java.lang.reflect.Method.invoke(Unknown Source)
sparklyr.Invoke.invoke(invoke.scala:139)
sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
sparklyr.StreamHandler.read(stream.scala:66)
sparklyr.BackendHandler.channelRead0(handler.scala:51)
sparklyr.BackendHandler.channelRead0(handler.scala:4)
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)

The currently active SparkContext was created at:

org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
java.lang.reflect.Method.invoke(Unknown Source)
sparklyr.Invoke.invoke(invoke.scala:139)
sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
sparklyr.StreamHandler.read(stream.scala:66)
sparklyr.BackendHandler.channelRead0(handler.scala:51)
sparklyr.BackendHandler.channelRead0(handler.scala:4)
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)

    at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
    at org.apache.spark.SparkContext$$anonfun$parallelize$1.apply(SparkContext.scala:716)
    at org.apache.spark.SparkContext$$anonfun$parallelize$1.apply(SparkContext.scala:715)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
    at org.apache.spark.SparkContext.parallelize(SparkContext.scala:715)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at sparklyr.Invoke.invoke(invoke.scala:139)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66)
    at sparklyr.BackendHandler.channelRead0(handler.scala:51)
    at sparklyr.BackendHandler.channelRead0(handler.scala:4)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Unknown Source)
Error: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

Вот мой код R:

library(sparklyr)
spark_install(version = "2.1.0")
sc <- spark_connect(master = "local")
testdata = spark_read_csv(sc, name = "testdata", path = ...)

За пределами Spark явозможность загрузить файл, например, используя read_csv.Я погуглил проблему, и было упомянуто, что потенциальной причиной является проблема OutOfMemory - я не совсем уверен, если это проблема и как ее исправить.

Буду признателен, если кто-нибудь подскажет мне способ отладки и исправления!

Приветствия

1 Ответ

0 голосов
/ 01 февраля 2019

Несколько вещей:

  • Если вы не используете крошечные данные, вы должны забыть о режиме local.Он в первую очередь предназначен для тестирования и небольших экспериментов, а не для работы даже с данными среднего размера.

    Поскольку он использует только одну JVM для кода драйвера и исполнителя, существует вероятность серьезного восстановления после сбоев, и если обработка идет AWOL, вы можете потерять весь сеанс (что, похоже, имеет место).

    Так что, если вы хотите проверить вещи локально на данных среднего размера, рассмотрите возможность использования автономный режим в противном случае просто уменьшите набор данных.

    В примечании стороны * В режиме 1014 * используется только одинобработка резьбы.Даже для тестирования имеет смысл использовать local[n] (для n потоков) или local[*] (для всех доступных ядер).

  • Будьте готовы настроить конфигурацию по умолчаниюзначения очень консервативны - например, spark.driver.memory по умолчанию составляет 1 ГБ - вам это может сойти с рук, в автономном режиме, но не тогда, когда все компоненты встроены в одну JVM.

  • Не доверяйте стандартным настройкам sparklyr.

    Разработчики sparklyr сделали очень неудачный выбор кеширования данных в памяти по умолчанию.Он не только идет вразрез со значениями по умолчанию Spark (которые по причине , использует MEMORY_AND_DISK для Dataset API) и редко дает какие-либо практические преимущества для данных реального размера, но также мешает оптимизатору Spark в некоторых уродливыхспособы (в первую очередь предотвращающие проецирование и выбор вниз).

    Так что приобретите привычку использовать memory = FALSE, когда это применимо:

    spark_read_csv(sc, name = "testdata", memory = FALSE, path = ...)
    
  • Предоставить схему для читателявместо использования схемы вывода.См. SparklyR: Конвертировать непосредственно в паркет

...