Apache Ignite + Spark Dataframes: клиент против сервера сомневается - PullRequest
0 голосов
/ 10 декабря 2018

Я пытался объединить зажигание и искру.Цель моего приложения - записывать и считывать искровые данные в / из ignite.Тем не менее, я сталкиваюсь с несколькими проблемами с большими наборами данных (> 200 000 000 строк).

У меня есть 6-узловый кластер Ignite, работающий на YARN.Он имеет 160 Гб памяти и 12 ядер.Я пытаюсь сохранить фрейм данных с помощью spark (около 20 ГБ необработанных текстовых данных) в кэше Ignite (резервная копия с разделами 1):

def main(args: Array[String]) {
    val ignite = setupIgnite

    closeAfter(ignite) { _ ⇒

      implicit val spark: SparkSession = SparkSession.builder
        .appName("Ignite Benchmark")
        .getOrCreate()

      val customer = readDF("csv", "|", Schemas.customerSchema, "hdfs://master.local:8020/apps/hive/warehouse/ssbplus100/customer")
      val part = readDF("csv", "|", Schemas.partSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/part")
      val supplier = readDF("csv", "|", Schemas.supplierSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/supplier")
      val dateDim = readDF("csv", "|", Schemas.dateDimSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/date_dim")
      val lineorder = readDF("csv", "|", Schemas.lineorderSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/lineorder")

      writeDF(customer, "customer", List("custkey"), TEMPLATES.REPLICATED)
      writeDF(part, "part", List("partkey"), TEMPLATES.REPLICATED)
      writeDF(supplier, "supplier", List("suppkey"), TEMPLATES.REPLICATED)
      writeDF(dateDim, "date_dim", List("datekey"), TEMPLATES.REPLICATED)
      writeDF(lineorder.limit(200000000), "lineorder", List("orderkey, linenumber"), TEMPLATES.NO_BACKUP)

    }
  }

В какой-то момент приложение spark получает эту ошибку:

    class org.apache.ignite.internal.mem.IgniteOutOfMemoryException: Out of memory in data region [name=default, initSize=256.0 MiB, maxSize=12.6 GiB, persistenceEnabled=false] Try the following:
  ^-- Increase maximum off-heap memory size (DataRegionConfiguration.maxSize)
  ^-- Enable Ignite persistence (DataRegionConfiguration.persistenceEnabled)
  ^-- Enable eviction or expiration policies
        at org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl.allocatePage(PageMemoryNoStoreImpl.java:304)
        at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.allocateDataPage(AbstractFreeList.java:463)
        at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.insertDataRow(AbstractFreeList.java:501)
        at org.apache.ignite.internal.processors.cache.persistence.RowStore.addRow(RowStore.java:97)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.createRow(IgniteCacheOffheapManagerImpl.java:1302)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4426)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4371)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.invokeClosure(BPlusTree.java:3083)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.access$6200(BPlusTree.java:2977)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1726)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invoke(BPlusTree.java:1610)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.invoke(IgniteCacheOffheapManagerImpl.java:1249)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl.invoke(IgniteCacheOffheapManagerImpl.java:352)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.storeValue(GridCacheMapEntry.java:3602)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.initialValue(GridCacheMapEntry.java:2774)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$IsolatedUpdater.receive(DataStreamerImpl.java:2125)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:140)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:400)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:305)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:60)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:90)
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1556)
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1184)
        at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:125)
        at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1091)
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:511)
        at java.lang.Thread.run(Thread.java:748)

Я думаю, что проблема заключается в том, что сервер зажигания запускается перед сеансом зажигания, как в официальных примерах воспламенения.Этот сервер запускает кеширование данных, которые я записываю в кэш воспламенения, и превышает его максимальный размер области по умолчанию (12 ГБ, что отличается от 20 ГБ, определенных для моего кластера пряжи).Тем не менее, я не понимаю, как примеры и документация говорят нам создать сервер зажигания до контекста спарк (и сеанс, который я предполагаю).Я понимаю, что без этого приложение будет зависать после завершения всех заданий spark, но я не понимаю логику наличия сервера в приложении spark, который запускает кеширование данных.Я очень озадачен этой концепцией, и на данный момент я настроил этот экземпляр ignite внутри spark, чтобы он был клиентом.

Это странное поведение, поскольку все мои узлы зажигания (работающие на YARN) имеют 20 ГБ, определенные для региона по умолчанию (я изменил его и проверил).Это указывает на то, что ошибка должна исходить от серверов воспламенения, запущенных в Spark (я думаю, что это один драйвер и один на одного работника), поскольку я не изменил размер региона по умолчанию в файле ignite-config.xml приложения spark(по умолчанию 12 ГБ, как показывает ошибка).Однако имеет ли это смысл?Должен ли Spark выбросить эту ошибку, являясь единственной целью чтения и записи данных из / для зажигания?Участвует ли Spark в кэшировании каких-либо данных и означает ли это, что я должен установить режим клиента в файле ignite-config.xml моего приложения, несмотря на тот факт, что официальные примеры не используют режим клиента?

С наилучшими пожеланиями,Carlos

1 Ответ

0 голосов
/ 10 декабря 2018

Во-первых, разъем Spark-Ignite уже подключается в режиме клиента .

Я собираюсь предположить, что у вас достаточно памяти, но вы можете следовать примеру в Планирование мощности руководство, чтобы быть уверенным.

Однако, я думаю, проблема в том, что вы слишком внимательно следите за примером приложения (!).Образец - чтобы быть автономным - включает в себя как сервер, так и клиент Spark.Если у вас уже есть кластер Ignite, вам не нужно запускать сервер в вашем клиенте Spark .

Это слегка взломанный пример из реального приложения (на Java, извините):

    try (SparkSession spark = SparkSession
        .builder()
        .appName("AppName")
        .master(sparkMaster)
        .config("spark.executor.extraClassPath", igniteClassPath())
        .getOrCreate()) {

        // Get source DataFrame
        DataSet<Row> results = ....

        results.write()
            .outputMode("append")
            .format(IgniteDataFrameSettings.FORMAT_IGNITE())
            .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), igniteCfgFile)
            .option(IgniteDataFrameSettings.OPTION_TABLE(), "Results")
            .option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE(), true)
            .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "name")
            .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "backups=1")
            .write();
    }

Я не тестировал, но вы должны понять: вам нужно указать URL-адрес файла конфигурации Ignite;он создает клиента для подключения к этому серверу за кулисами.

...