Сбой ансамбля распределенного конечного автомата zookeeper при обработке параллельных областей с ошибкой KeeperErrorCode = BadVersion - PullRequest
0 голосов
/ 14 мая 2018

Справочная информация:

Диаграмма: Диаграмма состояний uml Statemachine

У нас есть нормальный конечный автомат, как показано на диаграмме, которая контролирует Spring-BATCHмикросервис (развернутый на основе проекта «источник / процессор / приемник» потоков) для каждого запускаемого пакета.

Мы получаем последовательность вызовов REST для внутреннего запуска событий для каждого идентификатора пакета на машинном объекте соответствующего пакета.т. е. для идентификатора партии создается новый объект конечного автомата.

И каждая машина имеет n число параллельных областей (представляющих куски пружинной партии), как показано на схеме.

RESTвыполняемые вызовы используют многопоточную среду, в которой 2 одновременных вызова одного и того же batchId могут приходить для разных идентификаторов региона в состоянии BATCHPROCESSING.

До сих пор у нас был один узел (одна установка), на котором выполнялся этот микросервис конечного автомата, но теперь мы хотим развернуть его в нескольких экземплярах;принимать REST звонки.Для этого мы хотим представить распределенный конечный автомат.Ниже приведена конфигурация для запуска распределенного конечного автомата.

@Configuration
@EnableStateMachine
public  class StateMachineUMLWayConfiguration extends 
StateMachineConfigurerAdapter<String, String> {

..
..

@Override
public void configure(StateMachineModelConfigurer<String,String> model) 
throws Exception {
    model
        .withModel()
            .factory(stateMachineModelFactory());
}

@Bean
public StateMachineModelFactory<String,String> stateMachineModelFactory() {

    StorehubBatchUmlStateMachineModelFactory factory =null;

    try {
    factory = new StorehubBatchUmlStateMachineModelFactory
    (templateUMLInClasspath,stateMachineEnsemble());
    } catch (Exception e) {
    LOGGER.info("Config's State machine factory got exception 
    :"+factory);
    }
    LOGGER.info("Config's State machine factory method Called:"+factory);

factory.setStateMachineComponentResolver(stateMachineComponentResolver());
    return factory;
}


    @Override
    public void configure(StateMachineConfigurationConfigurer<String, 
String> 
    config) throws Exception {
    config
        .withDistributed()
            .ensemble(stateMachineEnsemble());
}

@Bean
public StateMachineEnsemble<String, String> stateMachineEnsemble() throws 
Exception {
    return new ZookeeperStateMachineEnsemble<String, String>(curatorClient(), "/batchfoo1", true, 512);
}

@Bean
    public CuratorFramework curatorClient() throws Exception {
        CuratorFramework client = 
CuratorFrameworkFactory.builder().defaultData(new byte[0])
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .connectString("localhost:2181").build();
        client.start();
        return client;
    }

Метод сборки StorehubBatchUmlStateMachineModelFactory:

    @Override
    public StateMachineModel<String, String> build(String batchChunkId) {

    Model model = null;
    try {
        model = UmlUtils.getModel(getResourceUri(resolveResource(batchChunkId)).getPath());
    } catch (IOException e) {
        throw new IllegalArgumentException("Cannot build model from resource " + resource + " or location " + location, e);
    }
    UmlModelParser parser = new UmlModelParser(model, this);
    DataHolder dataHolder = parser.parseModel();
    ConfigurationData<String, String> configurationData = new ConfigurationData<String, String>( null, new SyncTaskExecutor(),
            new ConcurrentTaskScheduler() , false, stateMachineEnsemble,
            new ArrayList<StateMachineListener<String, String>>(), false,
            null, null,
            null, null, false,
            null , batchChunkId, null,
            null ) ;
    return new DefaultStateMachineModel<String, String>(configurationData, dataHolder.getStatesData(), dataHolder.getTransitionsData());
}

Создан новый метод уровня пользовательского интерфейса службы вместо DefaultStateMachineService.acquireStateMachine(machineId)

@Override
public StateMachine<String, String> acquireDistributedStateMachine(String machineId, boolean start) {

    synchronized (distributedMachines) {
        DistributedStateMachine<String,String> distributedStateMachine = distributedMachines.get(machineId); 
        StateMachine<String,String> distMachineDelegateX = null;
        if (distributedStateMachine == null) { 

            StateMachine<String, String> machine = stateMachineFactory.getStateMachine(machineId);
            distributedStateMachine = (DistributedStateMachine<String, String>) machine;

        }
        distributedMachines.put(machineId, distributedStateMachine);

        return handleStart(distributedStateMachine, start);
    }
}

Проблема:

Теперь проблема в том, что микро-сервис, развернутый на одном экземпляре, работает успешно даже для событий, полученных им из многопоточной среды, где один поток сталкивается с вызовом события REST, принадлежащим к области 1, и одновременно приходит другой потокдля региона 2 той же партии.Машина работает синхронно, с успешной параллельной обработкой областей, до последнего состояния, т.е. BATCHCOMPLETED.Также мы проверили на стороне zookeeper, что, наконец, BATCHCOMPLETED STATE записывается в текущей версии узла.

Но, кроме 1-го экземпляра, когда мы храним тот же app-jar микро-сервиса, развернутый в каком-то другом месте, чтобы рассматривать его каквторой экземпляр микросервиса, который также теперь работает для приема вызовов REST событий (скажем, прослушивая другой порт tomcat 9002);где-то в середине где-то случайно.Этот сбой происходит случайным образом после запуска любого из событий в параллельных регионах и при внутреннем вызове ensemble.setState() при изменении состояния этого события.

Выдает следующую ошибку:

    [36mo.s.s.support.AbstractStateMachine      [0;39m [2m:[0;39m Interceptors threw exception, skipping state change

org.springframework.statemachine.StateMachineException: Error persisting data; nested exception is org.springframework.statemachine.StateMachineException: Error persisting data; nested exception is org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion
        at org.springframework.statemachine.zookeeper.ZookeeperStateMachineEnsemble.setState(ZookeeperStateMachineEnsemble.java:241) ~[spring-statemachine-zookeeper-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
        at org.springframework.statemachine.ensemble.DistributedStateMachine$LocalStateMachineInterceptor.preStateChange(DistributedStateMachine.java:209) ~[spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.StateMachineInterceptorList.preStateChange(StateMachineInterceptorList.java:101) ~[spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine.callPreStateChangeInterceptors(AbstractStateMachine.java:859) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine.switchToState(AbstractStateMachine.java:880) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine.access$500(AbstractStateMachine.java:81) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine$3.transit(AbstractStateMachine.java:335) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.handleTriggerTrans(DefaultStateMachineExecutor.java:286) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.handleTriggerTrans(DefaultStateMachineExecutor.java:211) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.processTriggerQueue(DefaultStateMachineExecutor.java:449) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.access$200(DefaultStateMachineExecutor.java:65) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor$1.run(DefaultStateMachineExecutor.java:323) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-4.3.13.RELEASE.jar!/:4.3.13.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.scheduleEventQueueProcessing(DefaultStateMachineExecutor.java:352) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.execute(DefaultStateMachineExecutor.java:163) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine.sendEventInternal(AbstractStateMachine.java:603) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine.sendEvent(AbstractStateMachine.java:218) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.ensemble.DistributedStateMachine.sendEvent(DistributedStateMachine.java:108) 
..skipping Lines....
Caused by: org.springframework.statemachine.StateMachineException: Error persisting data; nested exception is org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion
    at org.springframework.statemachine.zookeeper.ZookeeperStateMachinePersist.write(ZookeeperStateMachinePersist.java:113) ~[spring-statemachine-zookeeper-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.statemachine.zookeeper.ZookeeperStateMachinePersist.write(ZookeeperStateMachinePersist.java:50) ~[spring-statemachine-zookeeper-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.statemachine.zookeeper.ZookeeperStateMachineEnsemble.setState(ZookeeperStateMachineEnsemble.java:235) ~[spring-statemachine-zookeeper-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    ... 73 common frames omitted
Caused by: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion
at org.apache.zookeeper.KeeperException.create(KeeperException.java:115) ~[zookeeper-3.4.8.jar!/:3.4.8--1]
at org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1006) ~[zookeeper-3.4.8.jar!/:3.4.8--1]
at org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:910) ~[zookeeper-3.4.8.jar!/:3.4.8--1]
at org.apache.curator.framework.imps.CuratorTransactionImpl.doOperation(CuratorTransactionImpl.java:159)

Вопрос:
1. Так как вышеупомянутая конфигурация нуждается в чем-то большем, чтобы избежать этого исключения, упомянутого выше ?? Потому что оба экземпляра микросервиса конечного автомата былипротестирован случай, когда они оба подключались к одному и тому же экземпляру, т.е. к одной и той же строке .connectString("localhost:2181").build(), или случай, когда они были сделаны для подключения к различным экземплярам zookeeper (то есть 'localhost: 2181', 'localhost: 2182').

В обоих случаях во время обработки ансамбля конечного автомата возникает одно и то же исключение из BAD VERSION.

2. Также, если пакеты будут работать параллельно, поэтому для их работы потребуется создать соответствующие машины.параллельно в конце микро-службы конечного автомата. Итак, технически новый конечный автомат нам нужен для нового batchId, работающего одновременно.Но, глядя на ZookeeperStateMachineEnsemble, кажется, что один путь znode ассоциируется с одним ансамблем всякий раз, когда объект ensemble создается один раз в основном классе конфигурации («StateMachineUMLWayConfiguration»).

Значит, ожидается, что будет использоваться только этот экземпляр одноэлементного ансамбля?Разве нельзя создавать несколько ансамблей во время выполнения, ссылаясь на разные пути znode, запущенные параллельно, чтобы записать их соответствующие состояния распределенного автомата в соответствующие пути znode ??

a. Поскольку пакеты, работающие параллельно, потребуют создания отдельных путей znode. Таким образом, из-за нашей попытки сохранить отдельный путь znode для каждой партии нам нужен отдельный ансамбль для каждой машины пакета. Но похоже, что он попадает в состояние блокировки при получении соединения с znode через кураторский клиент.

b. Вызов REST, инициированный для запуска события, не завершается, поскольку полученная машина застревает в ансамбле для подключения.

Заранее спасибо.

...