Существует ли политика хранения для пользовательского хранилища состояний (RocksDb) с потоками Kafka? - PullRequest
0 голосов
/ 10 июля 2019

Я настраиваю новое потоковое приложение Kafka и хочу использовать пользовательское хранилище состояний с использованием RocksDb.Это нормально работает для помещения данных в хранилище состояний и получения из него запрашиваемого хранилища состояний и перебора данных. Однако примерно через 72 часа я наблюдаю, что в хранилище отсутствуют данные.Существует ли время хранения данных по умолчанию для хранилища состояний в потоках Kafka или в RocksDb?

Я использую хранилище настраиваемых состояний с использованием RocksDb, чтобы мы могли использовать функцию семейства столбцов, которую мы не можем использовать свстроенная реализация RocksDb с KStreams.Я реализовал пользовательское хранилище, используя интерфейс KeyValueStore.И еще у меня есть собственные StoreSupplier, StoreBuilder, StoreType и StoreWrapper.Раздел для журнала изменений создан для приложения, но к нему еще не поступают данные (еще не рассматривал эту проблему).

Помещение данных в это пользовательское хранилище состояний и получение из него запрашиваемого хранилища состояний работает нормально,Тем не менее, я вижу, что данные отсутствуют примерно через 72 часа из магазина.Я проверил, получив размер каталога хранилища состояний, а также экспортировав данные в файлы и проверив количество записей.

Использование сжатия SNAPPY и сжатия UNIVERSAL

Простая топология:

            final StreamsBuilder builder = new StreamsBuilder();
            String storeName = "store-name"
            List<String> cfNames = new ArrayList<>();


            // Hybrid custom store
            final StoreBuilder customStore = new RocksDBColumnFamilyStoreBuilder(storeName, cfNames);
            builder.addStateStore(customStore);

            KStream<String, String> inputstream = builder.stream(
                    inputTopicName,
                    Consumed.with(Serdes.String(), Serdes.String()
                    ));

            inputstream
                    .transform(() -> new CurrentTransformer(storeName), storeName);

            Topology tp = builder.build();

Фрагмент из реализации пользовательского хранилища:

 RocksDBColumnFamilyStore(final String name, final String parentDir, List<String> columnFamilyNames) {
     .....  
     ......

        final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig()
                .setBlockCache(cache)
                .setBlockSize(BLOCK_SIZE)
                .setCacheIndexAndFilterBlocks(true)
                .setPinL0FilterAndIndexBlocksInCache(true)
                .setFilterPolicy(filter)
                .setCacheIndexAndFilterBlocksWithHighPriority(true)
                .setPinTopLevelIndexAndFilter(true)
                ;


        cfOptions = new ColumnFamilyOptions()
                .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
                .setCompactionStyle(CompactionStyle.UNIVERSAL)
                .setMaxWriteBufferNumber(MAX_WRITE_BUFFERS)
                .setOptimizeFiltersForHits(true)
                .setLevelCompactionDynamicLevelBytes(true)
                .setTableFormatConfig(tableConfig);


        columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOptions));

        columnFamilyNames.stream().forEach((cfName) -> columnFamilyDescriptors.add(new ColumnFamilyDescriptor(cfName.getBytes(), cfOptions)));
    }

    @SuppressWarnings("unchecked")
    public void openDB(final ProcessorContext context) {
        Options opts = new Options()
                .prepareForBulkLoad();

        options = new DBOptions(opts)
                .setCreateIfMissing(true)
                .setErrorIfExists(false)
                .setInfoLogLevel(InfoLogLevel.INFO_LEVEL)
                .setMaxOpenFiles(-1)
                .setWriteBufferManager(writeBufferManager)
                .setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2))
                .setCreateMissingColumnFamilies(true);

        fOptions = new FlushOptions();
        fOptions.setWaitForFlush(true);

        dbDir = new File(new File(context.stateDir(), parentDir), name);

            try {
               Files.createDirectories(dbDir.getParentFile().toPath());
                db = RocksDB.open(options, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);

                columnFamilyHandles.stream().forEach((handle) -> {
                    try {
                        columnFamilyMap.put(new String(handle.getName()), handle);
                    } catch (RocksDBException e) {
                        throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
                    }
                });
            } catch (RocksDBException e) {
                throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
            }
        open = true;
    }

Ожидается, что хранилище состояний (RocksDb) будет хранить данные в течение неопределенного времени до тех пор, пока не будет удалено вручную или пока диск хранилища не выйдет из строя.Я не знаю, что потоки Kafka ввели TTl с государственными хранилищами.

...