Канал изменений CosmosDB с соединителем зажигания извлекает старые документы, которые уже были обработаны ранее.
Требование : необходимо извлекать только необработанные записи из CosmosDB, когда changefeedstartfromthebeginning = false.
Мы загружаем поток изменений космоса, используя следующий код:
Map<String, String> sourceConfigMap = new HashMap<>();
sourceConfigMap.put("ReadChangeFeed", "true");
sourceConfigMap.put("ChangeFeedStartFromTheBeginning", "false");
sourceConfigMap.put("ChangeFeedUseNextToken","false");
spark.readStream()
.options(sourceConfigMap)
.format(CosmosDBSourceProvider.class.getName())
.load()
Ниже приведены свойства конфигурации:
ReadChangeFeed= true
ChangeFeedStartFromTheBeginning = false
ChangeFeedUseNextToken= false
версии:
group: 'com.microsoft.azure', name: 'azure-documentdb', version: '2.4.0'
'com.microsoft.azure', name: 'azure-cosmosdb', version: '2.4.5'
'com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:2.0.3'