Как проанализировать и ограничить использование памяти геотреллы и искры для черепицы - PullRequest
0 голосов
/ 03 июля 2018

Наша главная цель - чтобы мы выполняли операции с большим объемом входных данных (около 80 ГБ). Проблема в том, что даже для небольших наборов данных мы часто получаем пространство кучи Java или другие ошибки, связанные с памятью.

Нашим временным решением было просто указать более высокий максимальный размер кучи (используя -Xmx локально или установив spark.executor.memory и spark.driver.memory для нашего экземпляра искры), но это, похоже, не обобщает, мы все равно сталкиваемся с ошибками для больших наборов данных или более высоких уровней масштабирования.

Для лучшего понимания, вот основная концепция того, что мы делаем с нашими данными:

  1. Загрузить данные, используя HadoopGeoTiffRDD.spatial(new Path(path))

  2. Сопоставить данные с плитками некоторого уровня масштабирования

    val extent = geotrellis.proj4.CRS.fromName("EPSG:4326").worldExtent
    val layout = layoutForZoom(zoom, extent)
    val metadata: TileLayerMetadata[SpatialKey] = dataSet.collectMetadata[SpatialKey](layout)
    val rdd = ContextRDD(dataSet.tileToLayout[SpatialKey](metadata), metadata)
    

    Где layoutForZoom в основном совпадает с geotrellis.spark.tiling.ZoomedLayoutScheme.layoutForZoom

  3. Затем мы выполняем некоторые операции над записями rdd, используя rdd.map и rdd.foreach для отображенных rdds.

  4. Мы агрегируем результаты четырех плиток, которые соответствуют одной плитке с более высоким уровнем масштабирования, используя groupByKey

  5. Переходите к 3, пока мы не достигнем определенного уровня масштабирования

Цель будет заключаться в следующем: учитывая ограничение памяти в X ГБ, разделить и обработать данные таким образом, чтобы мы потребляли не более X ГБ.

Похоже, что разбиение набора данных по tileToLayout уже занимает слишком много памяти при более высоких уровнях масштабирования (даже для очень маленьких наборов данных). Существуют ли альтернативы для разбиения на листы и загрузки данных в соответствии с неким LayoutDefinition? Насколько мы понимаем, HadoopGeoTiffRDD.spatial уже разбивает набор данных на небольшие области, которые затем делятся на плитки на tileToLayout. Можно ли как-то напрямую загрузить набор данных, соответствующий LayoutDefinition?

В нашем конкретном сценарии у нас есть 3 рабочих с 2 ГБ ОЗУ и 2 ядрами каждый. На одном из них также работает спаркмастер, который получает свою работу через spark-submit из экземпляра драйвера. Мы попробовали конфигурации, подобные этой:

val conf = new SparkConf().setAppName("Converter").setMaster("spark://IP-ADDRESS:PORT")
  .set("spark.executor.memory", "900m") // to be below the available 1024 MB of default slave RAM
  .set("spark.memory.fraction", "0.2") // to get more usable heap space
  .set("spark.executor.cores", "2")
  .set("spark.executor.instances", "3")

Пример ошибки пространства кучи на этапе разбиения на листы (шаг 2):

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 5, 192.168.0.2, executor 1): java.lang.OutOfMemoryError: Java heap space
        at scala.collection.mutable.ArrayBuilder$ofByte.mkArray(ArrayBuilder.scala:128)
        at scala.collection.mutable.ArrayBuilder$ofByte.resize(ArrayBuilder.scala:134)
        at scala.collection.mutable.ArrayBuilder$ofByte.sizeHint(ArrayBuilder.scala:139)
        at scala.collection.IndexedSeqOptimized$class.slice(IndexedSeqOptimized.scala:115)
        at scala.collection.mutable.ArrayOps$ofByte.slice(ArrayOps.scala:198)
        at geotrellis.util.StreamingByteReader.getBytes(StreamingByteReader.scala:98)
        at geotrellis.raster.io.geotiff.LazySegmentBytes.getBytes(LazySegmentBytes.scala:104)
        at geotrellis.raster.io.geotiff.LazySegmentBytes.readChunk(LazySegmentBytes.scala:81)
        at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
        at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185)
        at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1012)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1010)
        at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
        at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
        at geotrellis.spark.TileLayerMetadata$.collectMetadataWithCRS(TileLayerMetadata.scala:147)
        at geotrellis.spark.TileLayerMetadata$.fromRdd(TileLayerMetadata.scala:281)
        at geotrellis.spark.package$withCollectMetadataMethods.collectMetadata(package.scala:212)
        ...

Обновление:

Я извлек пример из своего кода и загрузил его в репозиторий на https://gitlab.com/hwuerz/geotrellis-spark-example. Вы можете запустить пример локально, используя sbt run и выбрав класс demo.HelloGeotrellis. Это создаст плитки для крошечного входного набора данных example.tif в соответствии с нашим определением макета, начиная с уровня масштабирования 20 (с использованием двух ядер по умолчанию, его можно настроить в файле HelloGeotrellis.scala ~, если уровень 20 все еще работает, он будет наиболее скорее всего не удастся использовать более высокие значения для bottomLayer).

Чтобы запустить код в Spark Cluster, я использую следующую команду:

 `sbt package && bash submit.sh --dataLocation /mnt/glusterfs/example.tif --bottomLayer 20 --topLayer 10 --cesiumTerrainDir /mnt/glusterfs/terrain/ --sparkMaster spark://192.168.0.8:7077`

Где submit.sh в основном работает spark-submit (см. Файл в репо).

example.tif включено в репозиторий в каталоге DebugFiles. В моей настройке файл распространяется через glusterfs, поэтому путь указывает на это местоположение. cesiumTerrainDir - это просто каталог, в котором мы храним наш сгенерированный вывод.

Мы думаем, что основная проблема может заключаться в том, что при использовании данных вызовов API геотреллис загружает в память полную структуру макета, которая слишком велика для более высоких уровней масштабирования. Есть ли способ избежать этого?

...