java, клиент OrbitzWorld consul - Как управлять сессиями и блокировками? - PullRequest
0 голосов
/ 21 февраля 2020


Используя java Консульский клиент OrbitzWorld, я пытаюсь синхронизировать несколько экземпляров моего приложения java с помощью метода acquireLock.

Мой код пока:
Зарегистрируйте приложение в качестве консула:

private void registerService(Config config) {
        String serviceId = config.getService().getId();
        String serviceName = config.getService().getName();
        long ttl = config.getService().getTtl();
        AgentClient agentClient = client.agentClient();
        Registration service = ImmutableRegistration.builder()
                .id(serviceId)
                .name(serviceName)
                .check(Registration.RegCheck.ttl(ttl))
                .build();
        agentClient.register(service);
        new HeartBeater(agentClient, serviceId, ttl).start();
    }

HeartBeater:

@Override
    public void run() {
        while(true) {
            try {
                client.pass(serviceId);
                Thread.sleep((Math.max(ttl / 2, 1)));
            } catch (NotRegisteredException | InterruptedException e) {}
        }
    }

Приведенный выше код работает и сервис успешно обновляется в консуле.
Теперь мне интересно о реализации блокировки.

То, что я написал до сих пор:

public boolean amILeader() {
    // return if current java app is leader
}

private String createSession() {
    final Session session = ImmutableSession.builder().name(config.getService().getName()).build();
    return client.sessionClient().createSession(session).getId();
}

private void watchLeaderLockStateChange() {
    KeyValueClient keyValueClient = client.keyValueClient();
    KVCache kvCache = KVCache.newCache(keyValueClient, Constants.LEADER_LOCK_KEY, config.getService().getWatchKey());
    kvCache.addListener(map -> {
        Value value = map.get(Constants.LEADER_LOCK_KEY);
        if(!value.getSession().isPresent()) {
            keyValueClient.acquireLock(Constants.LEADER_LOCK_KEY, ???); //create new session here ???
        }
    });
    kvCache.start();
}

Я застрял здесь, так как не понимаю теорию и не нашел ничего полезного в документации.

Мои вопросы:

  • является ли сеанс необходимым для синхронизации с помощью метода acquireLock?
  • , если да, когда и каким образом сеанс должен быть создано / синхронизировано через?
  • является ли сессия недействительной обычным делом? Согласно документации, это происходит, если одна из служб не может отправить ttl, что может быть очень часто.
  • как живые службы синхронизируются при создании нового сеанса?
  • как службы синхронизируются over lock?

Можете ли вы привести примеры кода или заполнить мою реализацию? Спасибо за любой ответ:]

Ответы [ 2 ]

0 голосов
/ 23 февраля 2020

Вы уже прочитали https://learn.hashicorp.com/consul/developer-configuration/elections? Он просматривает этот сценарий на уровне приложения, использующего Консул для выборов лидеров.

0 голосов
/ 21 февраля 2020

Мне кажется, я понял это сейчас.

Теория такова:

  • Консультационная сессия представляет собой соединение одного сервиса с Консулом. В моем случае это соединение между одним из моих java экземпляров приложения и консулом
  • , который используется для получения блокировки. Когда клиент приходит к Консулу и хочет получить блокировку, Консул проверит, есть ли с ним сессионный идентификатор. Если этого не происходит, Консул предоставляет клиенту блокировку и связывает sessionId клиентов с блокировкой.
  • блокировка ничего особенного. Это просто ключ в карте KV, сохраненный на узле Consul.
  • вы можете проверить блокировку, и если с ней связан любой sessionId, например:
    public class SessionFacade {
        private String leaderLock;
        private String sessionId;
        private Consul client;
        private Config config;

        public SessionFacade(Consul client, Config config) {
            this.client = client;
            this.config = config;
            this.leaderLock = "service/" + config.getService().getName() + "/leader";
            this.sessionId = createSession();
            new SessionHeartBeater(client, sessionId, config.getService().getSessionTtl()).start();
            watchLeaderLockStateChange(sessionId);
            client.keyValueClient().acquireLock(leaderLock, sessionId);
        }

        public boolean doIPossesLeaderLock() {
            Optional<Value> leaderValue = client.keyValueClient().getValue(leaderLock);
            if(leaderValue.isPresent()) {
                Optional<String> session = leaderValue.get().getSession();
                return session.isPresent() && session.get().equals(sessionId);
            }
            return false;
        }

        private String createSession() {
            int sessionTtl = config.getService().getSessionTtl();
            final Session session = ImmutableSession.builder()
                    .name(config.getService().getName())
                    .ttl(sessionTtl + "s")
                    .build();
            return client.sessionClient().createSession(session).getId();
        }

        private void watchLeaderLockStateChange(String sessionId) {
            KeyValueClient keyValueClient = client.keyValueClient();
            KVCache kvCache = KVCache.newCache(keyValueClient, leaderLock, config.getService().getWatchLockEach());
            kvCache.addListener(map -> {
                Value value = map.get(leaderLock);
                if(!value.getSession().isPresent()) {
                    keyValueClient.acquireLock(leaderLock, sessionId);
                }
            });
            kvCache.start();
        }
    }

Обратите внимание, что код вероятно, глючит, поскольку я еще не проверил его полностью.

...