Я настраиваю новое потоковое приложение Kafka и хочу использовать пользовательское хранилище состояний с использованием RocksDb.Это нормально работает для помещения данных в хранилище состояний и получения из него запрашиваемого хранилища состояний и перебора данных. Однако примерно через 72 часа я наблюдаю, что в хранилище отсутствуют данные.Существует ли время хранения данных по умолчанию для хранилища состояний в потоках Kafka или в RocksDb?
Я использую хранилище настраиваемых состояний с использованием RocksDb, чтобы мы могли использовать функцию семейства столбцов, которую мы не можем использовать свстроенная реализация RocksDb с KStreams.Я реализовал пользовательское хранилище, используя интерфейс KeyValueStore.И еще у меня есть собственные StoreSupplier, StoreBuilder, StoreType и StoreWrapper.Раздел для журнала изменений создан для приложения, но к нему еще не поступают данные (еще не рассматривал эту проблему).
Помещение данных в это пользовательское хранилище состояний и получение из него запрашиваемого хранилища состояний работает нормально,Тем не менее, я вижу, что данные отсутствуют примерно через 72 часа из магазина.Я проверил, получив размер каталога хранилища состояний, а также экспортировав данные в файлы и проверив количество записей.
Использование сжатия SNAPPY и сжатия UNIVERSAL
Простая топология:
final StreamsBuilder builder = new StreamsBuilder();
String storeName = "store-name"
List<String> cfNames = new ArrayList<>();
// Hybrid custom store
final StoreBuilder customStore = new RocksDBColumnFamilyStoreBuilder(storeName, cfNames);
builder.addStateStore(customStore);
KStream<String, String> inputstream = builder.stream(
inputTopicName,
Consumed.with(Serdes.String(), Serdes.String()
));
inputstream
.transform(() -> new CurrentTransformer(storeName), storeName);
Topology tp = builder.build();
Фрагмент из реализации пользовательского хранилища:
RocksDBColumnFamilyStore(final String name, final String parentDir, List<String> columnFamilyNames) {
.....
......
final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig()
.setBlockCache(cache)
.setBlockSize(BLOCK_SIZE)
.setCacheIndexAndFilterBlocks(true)
.setPinL0FilterAndIndexBlocksInCache(true)
.setFilterPolicy(filter)
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinTopLevelIndexAndFilter(true)
;
cfOptions = new ColumnFamilyOptions()
.setCompressionType(CompressionType.SNAPPY_COMPRESSION)
.setCompactionStyle(CompactionStyle.UNIVERSAL)
.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS)
.setOptimizeFiltersForHits(true)
.setLevelCompactionDynamicLevelBytes(true)
.setTableFormatConfig(tableConfig);
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOptions));
columnFamilyNames.stream().forEach((cfName) -> columnFamilyDescriptors.add(new ColumnFamilyDescriptor(cfName.getBytes(), cfOptions)));
}
@SuppressWarnings("unchecked")
public void openDB(final ProcessorContext context) {
Options opts = new Options()
.prepareForBulkLoad();
options = new DBOptions(opts)
.setCreateIfMissing(true)
.setErrorIfExists(false)
.setInfoLogLevel(InfoLogLevel.INFO_LEVEL)
.setMaxOpenFiles(-1)
.setWriteBufferManager(writeBufferManager)
.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2))
.setCreateMissingColumnFamilies(true);
fOptions = new FlushOptions();
fOptions.setWaitForFlush(true);
dbDir = new File(new File(context.stateDir(), parentDir), name);
try {
Files.createDirectories(dbDir.getParentFile().toPath());
db = RocksDB.open(options, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
columnFamilyHandles.stream().forEach((handle) -> {
try {
columnFamilyMap.put(new String(handle.getName()), handle);
} catch (RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
}
});
} catch (RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
}
open = true;
}
Ожидается, что хранилище состояний (RocksDb) будет хранить данные в течение неопределенного времени до тех пор, пока не будет удалено вручную или пока диск хранилища не выйдет из строя.Я не знаю, что потоки Kafka ввели TTl с государственными хранилищами.