Я пытаюсь запустить 2 экземпляра ChangeFeedProcessor, оба указывают на одну и ту же коллекцию и используют ту же коллекцию аренды в учетной записи Cosmos. Я указал уникальный hostName в обоих экземплярах
Я хочу, чтобы загрузка каналов распределялась между экземплярами в соответствии с логическими разделами (согласно документации Microsoft)
Когда я пытаюсь запустить второй экземпляр, я получаю в консоли следующее исключение:
Есть ли другой способ добиться этого?
Исключение в потоке "pool-23- thread-3 "java .lang.NullPointerException at com. azure .data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException (ExceptionClassifier. java: 56) в com. azure .data.cosmos .internal.changefeed.implementation.PartitionProcessorImpl.lambda $ run $ 0 (PartitionProcessorImpl. java: 115) в реакторе.core.publisher.MonoRunnable.block (МоноРанн. cosmos.internal.changefeed.implementation.PartitionSupervisorImpl $ 1.run (PartitionSupervisorImpl. java: 89) в java .lang.Thread.run (поток. java: 748) в java .util.concurrent.Thre adPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) в java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) в java .lang.Thread.run (поток. * 1049) *: 748) Исключение в потоке "pool-19-thread-3" java .lang.NullPointerException at com. azure .data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException (ExceptionClassifier. java: 56) в com. azure .data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda $ run $ 0 (PartitionProcessorImpl. java: 115) в реакторе.core.publisher.MonoRunnable.block (MonoRunnable. java : 66) в com. azure .data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl $ 1.run (PartitionSupervisorImpl. java: 89) в java .lang.Thread.run (поток. java: 748) в java .util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) в java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) в 1064 .lang.Thread.run (Тема. java: 748) Исключение в нить "pool-25-thread-3" java .lang.NullPointerException в com. azure .data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException (ExceptionClassifier. java: 56) в com. azure .data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda $ run $ 0 (PartitionProcessorImpl. java: 115) ... et c
Я использовал приведенный ниже maven зависимость
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>3.0.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
КОД SNIPPET
- создание списка ChangeFeedProcessors (для всех контейнеров, найденных в базе данных)
//FEED DATABASE
CosmosDatabase feedDatabase = cosmosClient.getDatabase(cosmosDbName);
//LEASE DATABASE
CosmosDatabase leaseDatabase = cosmosClient.getDatabase(cosmosDbName + LEASES);
//List of Containers in Feed Database
List<CosmosContainerProperties> containerPropertiesList = null;
try {
Flux<FeedResponse<CosmosContainerProperties>> containers = feedDatabase.readAllContainers();
List<FeedResponse<CosmosContainerProperties>> list = containers.toStream().collect(Collectors.toList());//Abhishek Optimize
containerPropertiesList = list.get(0).results();
}
catch (Exception e) {
System.out.println("Fail to query Containers");
throw new ServiceException("Fail to query Containers");
}
containerPropertiesList.parallelStream().forEach(cosmosContainerProperties -> {
//FEED CONTAINER
String containerName = cosmosContainerProperties.getString("id");
CosmosContainer feedContainer = feedDatabase.getContainer(containerName);
//LEASE CONTAINER
String leaseContainerName = containerName + "-leases";
CosmosContainer leaseContainer = leaseDatabase.getContainer(leaseContainerName);
//Building ChangeFeedProcessor for current Container
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
changeFeedProcessorOptions.startTime(OffsetDateTime.now());
ChangeFeedProcessor changeFeedProcessor = null;
try {
ChangeFeedProcessor.BuilderDefinition builderDefinition = ChangeFeedProcessor.Builder()
.hostName("Host1")//used Host2 in the other Host
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.options(changeFeedProcessorOptions)
.handleChanges(docs -> {
documentChangeHandler.processChanges(containerName, docs);
});
changeFeedProcessor = builderDefinition.build();
}
catch (Exception e) {
System.out.println("Fail to initialize ChangeFeedProcessor for " + containerName);
}
resultList.add(changeFeedProcessor);
System.out.println("processed: " + leaseContainerName);
});
Затем возвращается
resultList и запускаются процессоры ChangeFeed в следующем методе
public void startChangeFeed() {
if (null != changeFeedProcessors && !changeFeedProcessors.isEmpty()) {
changeFeedProcessors.parallelStream().forEach(processor->processor.start().block());
}
else {
System.out.println("changeFeedProcessors list is empty.. probably changeFeedProcessor has not been setup yet");
}
}