Почему Apache Куратор не запускает все обновления? - PullRequest
0 голосов
/ 23 февраля 2020

Пожалуйста, запустите следующее для вашего сервера Zookeeper после создания пустого пути /test/a.

import static java.lang.String.valueOf;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryForever;

public class CacheUpdateTest {
    static final String connectString = "127.0.0.1:2181,127.0.0.1:2191,127.0.0.1:2201";
    static volatile boolean stop = false;

    public static void main(String[] args) throws Exception {
        new Listener().start();
        Thread.sleep(1000);
        new Updater().start();
    }

    private static class Listener extends Thread {
        @SuppressWarnings("resource")
        @Override
        public void run() {
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectString).retryPolicy(new RetryForever(100)).build();
            client.start();

            PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
            cache.getListenable().addListener(new PathChildrenCacheListener() {

                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    if (event.getData() == null || event.getData().getData() == null) return;
                    int newI = Integer.parseInt(new String(event.getData().getData()));
                    System.err.println("Sensed update: " + newI);
                }
            });
            try {
                cache.start(StartMode.BUILD_INITIAL_CACHE);
                Thread.sleep(2000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static class Updater extends Thread {
        @Override
        public void run() {
            try {
                CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectString).retryPolicy(new RetryForever(100)).build();
                client.start();

                for (int i = 0; i < 10; i++) {
                    // Thread.sleep(100);
                    System.out.println("Updated child: " + i);
                    client.setData().forPath("/test/a", valueOf(i).getBytes());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Если я раскомментирую строку Thread.sleep (100), я обычно получаю следующий вывод

Updated child: 0
Sensed update: 0
Updated child: 1
Sensed update: 1
Updated child: 2
Sensed update: 2
Updated child: 3
Sensed update: 3
Updated child: 4
Sensed update: 4
Updated child: 5
Sensed update: 5
Updated child: 6
Sensed update: 6
Updated child: 7
Sensed update: 7
Updated child: 8
Sensed update: 8
Updated child: 9
Sensed update: 9

И когда я комментирую, я получаю следующий вывод

Updated child: 0
Updated child: 1
Sensed update: 1 --> Missed 0
Updated child: 2
Updated child: 3
Updated child: 4
Sensed update: 3 --> Missed 2
Updated child: 5
Updated child: 6
Sensed update: 5 --> Missed 4
Updated child: 7
Updated child: 8
Sensed update: 7 --> Missed 6
Updated child: 9
Sensed update: 9 --> Missed 8

Почему я не всегда получаю все уведомления? И почему я не пропустил первый?

1 Ответ

2 голосов
/ 24 февраля 2020

Куратор - это библиотека, созданная для облегчения работы с Apache Zookeeper. PathChildrenCache работает с использованием наблюдателей ZK.

Наблюдатель создает одноразовые часы. Если наблюдатель получает уведомление об изменении (или любой другой операции, на которую он подписан), то Наблюдатель использует Наблюдатель, и он должен снова создать новый Наблюдатель, чтобы в дальнейшем получать уведомление.

В вашем случае PathChildrenCache ищет изменения в узле. Это работает так, чтобы подождать, пока он не получит уведомление от ZK, и заново создать Watch, чтобы продолжить поиск дальнейших изменений.

Поскольку все асинхронно, данные могли изменяться много раз, прежде чем вы получите уведомление об изменении , Вот почему, когда вы устанавливаете задержку в Updater, вы можете видеть все изменения, потому что у кеша достаточно времени, чтобы обнаружить изменение и воссоздать Watch, прежде чем будет вызван новый setData. Когда вы опускаете режим сна, это происходит так быстро, что в кеше пропускаются некоторые события.

Для дальнейшего чтения, пожалуйста, ознакомьтесь с официальной документацией о наблюдателях, главным образом в этом разделе:

Поскольку часы запускаются один раз, и существует задержка между получением события и отправкой нового запроса на получение часов, вы не можете надежно увидеть каждое изменение, которое происходит с узлом в ZooKeeper. Будьте готовы разобраться со случаем, когда znode меняется несколько раз между получением события и настройкой часов снова. (Вам может быть все равно, но хотя бы понять, что это может произойти.)

...