Наша главная цель - чтобы мы выполняли операции с большим объемом входных данных (около 80 ГБ). Проблема в том, что даже для небольших наборов данных мы часто получаем пространство кучи Java или другие ошибки, связанные с памятью.
Нашим временным решением было просто указать более высокий максимальный размер кучи (используя -Xmx
локально или установив spark.executor.memory
и spark.driver.memory
для нашего экземпляра искры), но это, похоже, не обобщает, мы все равно сталкиваемся с ошибками для больших наборов данных или более высоких уровней масштабирования.
Для лучшего понимания, вот основная концепция того, что мы делаем с нашими данными:
Загрузить данные, используя HadoopGeoTiffRDD.spatial(new Path(path))
Сопоставить данные с плитками некоторого уровня масштабирования
val extent = geotrellis.proj4.CRS.fromName("EPSG:4326").worldExtent
val layout = layoutForZoom(zoom, extent)
val metadata: TileLayerMetadata[SpatialKey] = dataSet.collectMetadata[SpatialKey](layout)
val rdd = ContextRDD(dataSet.tileToLayout[SpatialKey](metadata), metadata)
Где layoutForZoom
в основном совпадает с geotrellis.spark.tiling.ZoomedLayoutScheme.layoutForZoom
Затем мы выполняем некоторые операции над записями rdd, используя rdd.map
и rdd.foreach
для отображенных rdds.
Мы агрегируем результаты четырех плиток, которые соответствуют одной плитке с более высоким уровнем масштабирования, используя groupByKey
Переходите к 3, пока мы не достигнем определенного уровня масштабирования
Цель будет заключаться в следующем: учитывая ограничение памяти в X ГБ, разделить и обработать данные таким образом, чтобы мы потребляли не более X ГБ.
Похоже, что разбиение набора данных по tileToLayout
уже занимает слишком много памяти при более высоких уровнях масштабирования (даже для очень маленьких наборов данных). Существуют ли альтернативы для разбиения на листы и загрузки данных в соответствии с неким LayoutDefinition? Насколько мы понимаем, HadoopGeoTiffRDD.spatial
уже разбивает набор данных на небольшие области, которые затем делятся на плитки на tileToLayout
. Можно ли как-то напрямую загрузить набор данных, соответствующий LayoutDefinition?
В нашем конкретном сценарии у нас есть 3 рабочих с 2 ГБ ОЗУ и 2 ядрами каждый. На одном из них также работает спаркмастер, который получает свою работу через spark-submit из экземпляра драйвера. Мы попробовали конфигурации, подобные этой:
val conf = new SparkConf().setAppName("Converter").setMaster("spark://IP-ADDRESS:PORT")
.set("spark.executor.memory", "900m") // to be below the available 1024 MB of default slave RAM
.set("spark.memory.fraction", "0.2") // to get more usable heap space
.set("spark.executor.cores", "2")
.set("spark.executor.instances", "3")
Пример ошибки пространства кучи на этапе разбиения на листы (шаг 2):
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 5, 192.168.0.2, executor 1): java.lang.OutOfMemoryError: Java heap space
at scala.collection.mutable.ArrayBuilder$ofByte.mkArray(ArrayBuilder.scala:128)
at scala.collection.mutable.ArrayBuilder$ofByte.resize(ArrayBuilder.scala:134)
at scala.collection.mutable.ArrayBuilder$ofByte.sizeHint(ArrayBuilder.scala:139)
at scala.collection.IndexedSeqOptimized$class.slice(IndexedSeqOptimized.scala:115)
at scala.collection.mutable.ArrayOps$ofByte.slice(ArrayOps.scala:198)
at geotrellis.util.StreamingByteReader.getBytes(StreamingByteReader.scala:98)
at geotrellis.raster.io.geotiff.LazySegmentBytes.getBytes(LazySegmentBytes.scala:104)
at geotrellis.raster.io.geotiff.LazySegmentBytes.readChunk(LazySegmentBytes.scala:81)
at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1012)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1010)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
at geotrellis.spark.TileLayerMetadata$.collectMetadataWithCRS(TileLayerMetadata.scala:147)
at geotrellis.spark.TileLayerMetadata$.fromRdd(TileLayerMetadata.scala:281)
at geotrellis.spark.package$withCollectMetadataMethods.collectMetadata(package.scala:212)
...
Обновление:
Я извлек пример из своего кода и загрузил его в репозиторий на https://gitlab.com/hwuerz/geotrellis-spark-example. Вы можете запустить пример локально, используя sbt run
и выбрав класс demo.HelloGeotrellis
. Это создаст плитки для крошечного входного набора данных example.tif
в соответствии с нашим определением макета, начиная с уровня масштабирования 20 (с использованием двух ядер по умолчанию, его можно настроить в файле HelloGeotrellis.scala
~, если уровень 20 все еще работает, он будет наиболее скорее всего не удастся использовать более высокие значения для bottomLayer
).
Чтобы запустить код в Spark Cluster, я использую следующую команду:
`sbt package && bash submit.sh --dataLocation /mnt/glusterfs/example.tif --bottomLayer 20 --topLayer 10 --cesiumTerrainDir /mnt/glusterfs/terrain/ --sparkMaster spark://192.168.0.8:7077`
Где submit.sh
в основном работает spark-submit
(см. Файл в репо).
example.tif
включено в репозиторий в каталоге DebugFiles
. В моей настройке файл распространяется через glusterfs, поэтому путь указывает на это местоположение. cesiumTerrainDir
- это просто каталог, в котором мы храним наш сгенерированный вывод.
Мы думаем, что основная проблема может заключаться в том, что при использовании данных вызовов API геотреллис загружает в память полную структуру макета, которая слишком велика для более высоких уровней масштабирования. Есть ли способ избежать этого?