Kafka-Streams: Как запросить постоянное хранилище RocksDB при сериализации ключа? - PullRequest
1 голос
/ 31 октября 2019

Я строю топологию потоков kafka, в которой мне нужно материализовать содержимое Global KTable в постоянном хранилище RocksDB. Это хранилище будет далее использоваться в каждом экземпляре моего приложения для запросов.

Хранилище состоит из пар значений ключа, где ключ сериализуется с помощью Avro, а полезная нагрузка представляет собой просто байтовый массив.

Моя проблема заключается в том, что я пытаюсь запросить хранилище по определенному ключу, как показано в коде ниже:

fun topology() {
        val streamsBuilder = StreamsBuilder()

        streamsBuilder.globalTable<Key, ByteArray>(
                SOURCE_TOPIC_FOR_GLOBAL_KTABLE,
                Consumed.with(null /*Uses default serde provided in config which is SpecificAvroSerde*/,
                        Serdes.ByteArray()),
                Materialized.`as`(STORE_NAME))

        val kafkaStreams = KafkaStreams(streamsBuilder.build(), getProperties(), DefaultKafkaClientSupplier())

        kafkaStreams.start()
    }

    private fun getProperties(): Properties {
        val properties = Properties()

        properties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "kafka-brokers-ip"
        properties[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "schema-registry-ip"
        properties[StreamsConfig.APPLICATION_ID_CONFIG] = "app-id"
        properties[ConsumerConfig.CLIENT_ID_CONFIG] = "client-id"
        properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
        properties[StreamsConfig.STATE_DIR_CONFIG] = "C:/dev/kafka-streams"

        properties[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = SpecificAvroSerde::class.java
        properties[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = SpecificAvroSerde::class.java

        return properties
    }

И это функция, которую яиспользуйте для запроса хранилища:

val store = kafkaStreams
                .store(STORE_NAME, QueryableStoreTypes.keyValueStore<Key, ByteArray>())

        store.all().forEach { keyValue ->
            if (keyValue.key == Key("25-OCT-19 USD")) {
                //we get here, so the key is in the store
                println("Keys are equal.")
                println(keyValue) // this gets printed: KeyValue({"value": "25-OCT-19 USD"}, [B@3ca343b5)
                println(store.get(Key("25-OCT-19 USD"))) //this is null
                println(store.get(keyValue.key)) //this is also null
            }
        }

Как видно из комментариев кода, в хранилище есть значения, но когда я пытаюсь выполнить запрос по определенному ключу, я ничего не получаю обратно.

...