Я пытался объединить зажигание и искру.Цель моего приложения - записывать и считывать искровые данные в / из ignite.Тем не менее, я сталкиваюсь с несколькими проблемами с большими наборами данных (> 200 000 000 строк).
У меня есть 6-узловый кластер Ignite, работающий на YARN.Он имеет 160 Гб памяти и 12 ядер.Я пытаюсь сохранить фрейм данных с помощью spark (около 20 ГБ необработанных текстовых данных) в кэше Ignite (резервная копия с разделами 1):
def main(args: Array[String]) {
val ignite = setupIgnite
closeAfter(ignite) { _ ⇒
implicit val spark: SparkSession = SparkSession.builder
.appName("Ignite Benchmark")
.getOrCreate()
val customer = readDF("csv", "|", Schemas.customerSchema, "hdfs://master.local:8020/apps/hive/warehouse/ssbplus100/customer")
val part = readDF("csv", "|", Schemas.partSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/part")
val supplier = readDF("csv", "|", Schemas.supplierSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/supplier")
val dateDim = readDF("csv", "|", Schemas.dateDimSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/date_dim")
val lineorder = readDF("csv", "|", Schemas.lineorderSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/lineorder")
writeDF(customer, "customer", List("custkey"), TEMPLATES.REPLICATED)
writeDF(part, "part", List("partkey"), TEMPLATES.REPLICATED)
writeDF(supplier, "supplier", List("suppkey"), TEMPLATES.REPLICATED)
writeDF(dateDim, "date_dim", List("datekey"), TEMPLATES.REPLICATED)
writeDF(lineorder.limit(200000000), "lineorder", List("orderkey, linenumber"), TEMPLATES.NO_BACKUP)
}
}
В какой-то момент приложение spark получает эту ошибку:
class org.apache.ignite.internal.mem.IgniteOutOfMemoryException: Out of memory in data region [name=default, initSize=256.0 MiB, maxSize=12.6 GiB, persistenceEnabled=false] Try the following:
^-- Increase maximum off-heap memory size (DataRegionConfiguration.maxSize)
^-- Enable Ignite persistence (DataRegionConfiguration.persistenceEnabled)
^-- Enable eviction or expiration policies
at org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl.allocatePage(PageMemoryNoStoreImpl.java:304)
at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.allocateDataPage(AbstractFreeList.java:463)
at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.insertDataRow(AbstractFreeList.java:501)
at org.apache.ignite.internal.processors.cache.persistence.RowStore.addRow(RowStore.java:97)
at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.createRow(IgniteCacheOffheapManagerImpl.java:1302)
at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4426)
at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4371)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.invokeClosure(BPlusTree.java:3083)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.access$6200(BPlusTree.java:2977)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1726)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invoke(BPlusTree.java:1610)
at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.invoke(IgniteCacheOffheapManagerImpl.java:1249)
at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl.invoke(IgniteCacheOffheapManagerImpl.java:352)
at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.storeValue(GridCacheMapEntry.java:3602)
at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.initialValue(GridCacheMapEntry.java:2774)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$IsolatedUpdater.receive(DataStreamerImpl.java:2125)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:140)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:400)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:305)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:60)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:90)
at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1556)
at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1184)
at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:125)
at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1091)
at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:511)
at java.lang.Thread.run(Thread.java:748)
Я думаю, что проблема заключается в том, что сервер зажигания запускается перед сеансом зажигания, как в официальных примерах воспламенения.Этот сервер запускает кеширование данных, которые я записываю в кэш воспламенения, и превышает его максимальный размер области по умолчанию (12 ГБ, что отличается от 20 ГБ, определенных для моего кластера пряжи).Тем не менее, я не понимаю, как примеры и документация говорят нам создать сервер зажигания до контекста спарк (и сеанс, который я предполагаю).Я понимаю, что без этого приложение будет зависать после завершения всех заданий spark, но я не понимаю логику наличия сервера в приложении spark, который запускает кеширование данных.Я очень озадачен этой концепцией, и на данный момент я настроил этот экземпляр ignite внутри spark, чтобы он был клиентом.
Это странное поведение, поскольку все мои узлы зажигания (работающие на YARN) имеют 20 ГБ, определенные для региона по умолчанию (я изменил его и проверил).Это указывает на то, что ошибка должна исходить от серверов воспламенения, запущенных в Spark (я думаю, что это один драйвер и один на одного работника), поскольку я не изменил размер региона по умолчанию в файле ignite-config.xml приложения spark(по умолчанию 12 ГБ, как показывает ошибка).Однако имеет ли это смысл?Должен ли Spark выбросить эту ошибку, являясь единственной целью чтения и записи данных из / для зажигания?Участвует ли Spark в кэшировании каких-либо данных и означает ли это, что я должен установить режим клиента в файле ignite-config.xml моего приложения, несмотря на тот факт, что официальные примеры не используют режим клиента?
С наилучшими пожеланиями,Carlos