Spark с Ignite - приложение продолжает работать - PullRequest
0 голосов
/ 23 октября 2018

Я пытаюсь писать / читать из / в Ignite в приложении Spark.Кажется, что и чтение, и запись работают нормально, но приложение не заканчивается, даже если оно завершает то, что должно было сделать.

В частности, журнал продолжает печатать те же строки:

2018-10-23 09:08:12 DEBUG GridCachePartitionExchangeManager:558 - Before waiting for exchange futures [futs[], worker=GridWorker [name=partition-exchanger, igniteInstanceName=null, finished=false, hashCode=215498915, interrupted=false, runner=exchange-worker-#35]]
2018-10-23 09:08:12 DEBUG GridTimeoutProcessor:558 - Timeout has occurred [obj=CancelableTask [id=2374d20a661-967c7da2-84ed-46d9-9ec1-fd64ab02e16a, endTime=1540285692400, period=2000, cancel=false, task=org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing$13@2e49ebc1], process=true]
2018-10-23 09:08:14 DEBUG GridTimeoutProcessor:558 - Timeout has occurred [obj=CancelableTask [id=3374d20a661-967c7da2-84ed-46d9-9ec1-fd64ab02e16a, endTime=1540285694375, period=3000, cancel=false, task=org.apache.ignite.internal.processors.query.GridQueryProcessor$2@579e2347], process=true]
2018-10-23 09:08:14 DEBUG GridTimeoutProcessor:558 - Timeout has occurred [obj=CancelableTask [id=2374d20a661-967c7da2-84ed-46d9-9ec1-fd64ab02e16a, endTime=1540285694400, period=2000, cancel=false, task=org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing$13@2e49ebc1], process=true]
2018-10-23 09:08:14 DEBUG GridTimeoutProcessor:558 - Timeout has occurred [obj=CancelableTask [id=4374d20a661-967c7da2-84ed-46d9-9ec1-fd64ab02e16a, endTime=1540285694437, period=3000, cancel=false, task=MetricsUpdater [prevGcTime=337, prevCpuTime=16515, super=org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$MetricsUpdater@1fad637f]], process=true]
2018-10-23 09:08:15 DEBUG TcpCommunicationSpi:558 - Balancing data [min0=0, minIdx=0, max0=-1, maxIdx=-1]
2018-10-23 09:08:16 DEBUG GridTimeoutProcessor:558 - Timeout has occurred [obj=CancelableTask [id=1374d20a661-967c7da2-84ed-46d9-9ec1-fd64ab02e16a, endTime=1540285696182, period=10000, cancel=false, task=org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing$12@1c86b2f2], process=true]
2018-10-23 09:08:16 DEBUG ClientListenerProcessor:558 - Balancing data [min0=0, minIdx=0, max0=-1, maxIdx=-1]
2018-10-23 09:08:16 DEBUG GridTimeoutProcessor:558 - Timeout has occurred [obj=CancelableTask [id=2374d20a661-967c7da2-84ed-46d9-9ec1-fd64ab02e16a, endTime=1540285696405, period=2000, cancel=false, task=org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing$13@2e49ebc1], process=true]
2018-10-23 09:08:17 DEBUG GridCachePartitionExchangeManager:558 - Before waiting for exchange futures [futs[], worker=GridWorker [name=partition-exchanger, igniteInstanceName=null, finished=false, hashCode=215498915, interrupted=false, runner=exchange-worker-#35]]

и т. д.

Время от времени он также печатает:

2018-10-23 09:07:59 INFO  IgniteKernal:566 - 
Metrics for local node (to disable set 'metricsLogFrequency' to 0)
    ^-- Node [id=a7f30542, uptime=00:01:00.019]
    ^-- H/N/C [hosts=2, nodes=4, CPUs=4]
    ^-- CPU [cur=1.67%, avg=9.47%, GC=0%]
    ^-- PageMemory [pages=0]
    ^-- Heap [used=278MB, free=69.36%, comm=478MB]
    ^-- Non heap [used=129MB, free=-1%, comm=130MB]
    ^-- Outbound messages queue [size=0]
    ^-- Public thread pool [active=0, idle=0, qSize=0]
    ^-- System thread pool [active=0, idle=4, qSize=0]

Я просто делаю очень простые операции чтения / записи в Ignite, и когда я настраиваю ридер/ Writer с

.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, path)

path соответствует этому файлу конфигурации:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
                       http://www.springframework.org/schema/beans/spring-beans.xsd">
                       <bean class="org.apache.ignite.configuration.IgniteConfiguration">
                         <!-- Enabling Apache Ignite native persistence. -->
                         <property name="dataStorageConfiguration">
                           <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                             <property name="defaultDataRegionConfiguration">
                               <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                                 <property name="persistenceEnabled" value="true"/>
                               </bean>
                             </property>
                           </bean>
                         </property>
                       </bean>
</beans>

ОБНОВЛЕНИЕ:

Настройкаследующий.В настоящее время я использую Vagrant + Ansible + VirtualBox для запуска трех машин.Первый управляет искровым мастером, а остальные управляют искровыми рабочими.Ignite настроен на всех компьютерах, но он запускается с использованием сценария ignite.sh только на рабочих.Он настроен с включенным постоянством, поэтому кластер также активируется путем выдачи control.sh --activate.

Все работает нормально, и я могу по запросу события запрашивать данные с помощью API REST, единственное, что, кажется, не работает, этоэта вещь поддержания приложения в рабочем состоянии.

ОБНОВЛЕНИЕ 2:

Предложение @StephenDarlington частично сработало.Закрытие контекста Spark в конце приложения заставляет исполнителя завершить задачу.С другой стороны, из пользовательского интерфейса я теперь вижу, что 0 приложений работают, но драйвер все еще находится в рабочем состоянии.

Вот стандартный вывод программы драйвера:

2018-10-26 11:05:37 DEBUG G:558 - Ignite instance stopped ok: null
2018-10-26 11:05:37 INFO  StandaloneSchedulerBackend:54 - Shutting down all executors
2018-10-26 11:05:37 INFO  CoarseGrainedSchedulerBackend$DriverEndpoint:54 - Asking each executor to shut down
2018-10-26 11:05:37 INFO  MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2018-10-26 11:05:37 INFO  MemoryStore:54 - MemoryStore cleared
2018-10-26 11:05:37 INFO  BlockManager:54 - BlockManager stopped
2018-10-26 11:05:37 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-10-26 11:05:37 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2018-10-26 11:05:37 INFO  SparkContext:54 - Successfully stopped SparkContext
2018-10-26 11:05:39 DEBUG PoolThreadCache:81 - Freed 25 thread-local buffer(s) from thread: shuffle-server-7-2
2018-10-26 11:05:39 DEBUG PoolThreadCache:81 - Freed 31 thread-local buffer(s) from thread: rpc-server-5-1
2018-10-26 11:05:39 DEBUG PoolThreadCache:81 - Freed 4 thread-local buffer(s) from thread: rpc-server-5-2

1 Ответ

0 голосов
/ 24 октября 2018

Как (не) Spark управляет серверами Ignite

Итак, вы запускаете интеграцию Ignite со Spark в Автономный режим .

В этом режиме запускаются узлы Igniteнезависимо от Spark, Spark не контролирует жизненный цикл Ignite.Сравните его с любой другой базой данных - приложения не запускают и не останавливают MySQL, MySQL работает самостоятельно.

Для иллюстрации сравните его со встроенным режимом ( НЕ используйте этот режим, он устарел и в конечном итоге будет удален ).При этом Spark сам запускает Ignite-серверы на каждом работнике и останавливает их, когда это сделано.Это несколько (хотя и не полностью) похоже на приложение, работающее с SQLite - не существует сервера SQLite, работающего вне приложения и управляемого отдельно.

Как выключить Ignite кластер вручную

Если вы хотитечтобы убить Ignite узлы, когда вы закончите с Spark, вы должны сделать это сами.Есть несколько вариантов.

Использование kill

Да, просто так, старый добрый kill <pid>.Лучше не использовать kill -9, чтобы Ignite мог поймать сигнал и выполнить некоторую очистку, хотя он должен запускаться даже после kill -9, если ваш walMode равен FSYNC или LOG_ONLY.

Youможет каким-то образом вызвать kill из приложения Spark во время его закрытия.

Использование Ignition::stop и IgniteCompute

Существует способ, позволяющий убить удаленный узел с помощью кода Java.

Ignition::stop метод останавливает локальный узел Ignite.Вы можете выполнить это на каждом узле удаленно через IgniteCompute.Просто не забудьте создать новый поток из задачи, которую вы отправляете, - попытка остановить Ignite от самой задачи - плохая идея.

Код будет выглядеть примерно так (не проверено!):

// Start a client node. Pass an appropriate configuration to connect to the cluster.
Ignition.setClientMode(true);
Ignite clusterKiller = Ignition.start();

// Stop all remote nodes.
clusterKiller.compute(clusterKiller.cluster().forRemotes()).broadcast(() -> {
    // Use a new thread to avoid deadlock.
    new Thread(() -> {
        Ignition.stop(false);
    }).start();
});

// Stop the client node.
clusterKiller.close();
...