Запуск нескольких экземпляров ChangeFeedProcessor для Azure Cosmos - PullRequest
0 голосов
/ 07 января 2020

Я пытаюсь запустить 2 экземпляра ChangeFeedProcessor, оба указывают на одну и ту же коллекцию и используют ту же коллекцию аренды в учетной записи Cosmos. Я указал уникальный hostName в обоих экземплярах

Я хочу, чтобы загрузка каналов распределялась между экземплярами в соответствии с логическими разделами (согласно документации Microsoft)

Когда я пытаюсь запустить второй экземпляр, я получаю в консоли следующее исключение:

Есть ли другой способ добиться этого?

Исключение в потоке "pool-23- thread-3 "java .lang.NullPointerException at com. azure .data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException (ExceptionClassifier. java: 56) в com. azure .data.cosmos .internal.changefeed.implementation.PartitionProcessorImpl.lambda $ run $ 0 (PartitionProcessorImpl. java: 115) в реакторе.core.publisher.MonoRunnable.block (МоноРанн. cosmos.internal.changefeed.implementation.PartitionSupervisorImpl $ 1.run (PartitionSupervisorImpl. java: 89) в java .lang.Thread.run (поток. java: 748) в java .util.concurrent.Thre adPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) в java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) в java .lang.Thread.run (поток. * 1049) *: 748) Исключение в потоке "pool-19-thread-3" java .lang.NullPointerException at com. azure .data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException (ExceptionClassifier. java: 56) в com. azure .data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda $ run $ 0 (PartitionProcessorImpl. java: 115) в реакторе.core.publisher.MonoRunnable.block (MonoRunnable. java : 66) в com. azure .data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl $ 1.run (PartitionSupervisorImpl. java: 89) в java .lang.Thread.run (поток. java: 748) в java .util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) в java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) в 1064 .lang.Thread.run (Тема. java: 748) Исключение в нить "pool-25-thread-3" java .lang.NullPointerException в com. azure .data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException (ExceptionClassifier. java: 56) в com. azure .data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda $ run $ 0 (PartitionProcessorImpl. java: 115) ... et c

Я использовал приведенный ниже maven зависимость

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-cosmos</artifactId>
    <version>3.0.0</version>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-api</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>guava</artifactId>
            <groupId>com.google.guava</groupId>
        </exclusion>
    </exclusions>
</dependency>

КОД SNIPPET

  1. создание списка ChangeFeedProcessors (для всех контейнеров, найденных в базе данных)

        //FEED DATABASE
        CosmosDatabase feedDatabase = cosmosClient.getDatabase(cosmosDbName);

        //LEASE DATABASE
        CosmosDatabase leaseDatabase = cosmosClient.getDatabase(cosmosDbName + LEASES);

        //List of Containers in Feed Database
        List<CosmosContainerProperties> containerPropertiesList = null;
        try {
            Flux<FeedResponse<CosmosContainerProperties>> containers = feedDatabase.readAllContainers();
            List<FeedResponse<CosmosContainerProperties>> list = containers.toStream().collect(Collectors.toList());//Abhishek Optimize
            containerPropertiesList = list.get(0).results();
        }
        catch (Exception e) {
            System.out.println("Fail to query Containers");
            throw new ServiceException("Fail to query Containers");
        }

containerPropertiesList.parallelStream().forEach(cosmosContainerProperties -> {
                //FEED CONTAINER
                String containerName = cosmosContainerProperties.getString("id");
                CosmosContainer feedContainer = feedDatabase.getContainer(containerName);

                //LEASE CONTAINER
                String leaseContainerName = containerName + "-leases";
                CosmosContainer leaseContainer = leaseDatabase.getContainer(leaseContainerName);

                //Building ChangeFeedProcessor for current Container
                ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
                changeFeedProcessorOptions.startTime(OffsetDateTime.now());

                ChangeFeedProcessor changeFeedProcessor = null;
                try {
                    ChangeFeedProcessor.BuilderDefinition builderDefinition = ChangeFeedProcessor.Builder()                           
                            .hostName("Host1")//used Host2 in the other Host
                            .feedContainer(feedContainer)
                            .leaseContainer(leaseContainer)
                            .options(changeFeedProcessorOptions)
                            .handleChanges(docs -> {
                                documentChangeHandler.processChanges(containerName, docs);
                            });
                    changeFeedProcessor = builderDefinition.build();
                }
                catch (Exception e) {
                    System.out.println("Fail to initialize ChangeFeedProcessor for " + containerName);
                }
                resultList.add(changeFeedProcessor);

                System.out.println("processed:  " + leaseContainerName);
            });
Затем возвращается resultList и запускаются процессоры ChangeFeed в следующем методе
public void startChangeFeed() {
        if (null != changeFeedProcessors && !changeFeedProcessors.isEmpty()) {
            changeFeedProcessors.parallelStream().forEach(processor->processor.start().block());
        }
        else {
            System.out.println("changeFeedProcessors list is empty.. probably changeFeedProcessor has not been setup yet");
        }
    }

1 Ответ

0 голосов
/ 17 января 2020

Судя по комментариям, проблема связана с VPN / Proxy или чем-то, что блокирует требуемые диапазоны портов.

В прямом режиме необходимо открыть и настроить определенный диапазон портов в VPN / Proxy / Firewall:

Connection Modes

Если настройка невозможна, вы можете переключиться в режим шлюза / HTTP.

Процессор каналов изменений использует вторую аренду сбор данных для сохранения состояния (в основном объясняется здесь https://docs.microsoft.com/azure/cosmos-db/change-feed-processor#components -процесс-изменяющий процессор вместе с NET выборками, но концепции совпадают). Текущая модель создает 1 аренду на физический раздел (я говорю текущую модель, потому что эта реализация может улучшиться в будущем для лучшего распределения), и каждая аренда может принадлежать только 1 экземпляру. Таким образом, если у вас есть 2 аренды и 2 экземпляра, каждому из них будет принадлежать 1 аренда.

Каждый экземпляр будет обрабатывать изменения в разделе (ях) на основе аренды, которой он владеет.

Распределение нагрузки 90/10 означает, что изменения, происходящие в вашей коллекции, по-видимому, искажены и происходят в основном в одном разделе (горячем разделе) и распределены неравномерно.

...