Я строю топологию потоков kafka, в которой мне нужно материализовать содержимое Global KTable в постоянном хранилище RocksDB. Это хранилище будет далее использоваться в каждом экземпляре моего приложения для запросов.
Хранилище состоит из пар значений ключа, где ключ сериализуется с помощью Avro, а полезная нагрузка представляет собой просто байтовый массив.
Моя проблема заключается в том, что я пытаюсь запросить хранилище по определенному ключу, как показано в коде ниже:
fun topology() {
val streamsBuilder = StreamsBuilder()
streamsBuilder.globalTable<Key, ByteArray>(
SOURCE_TOPIC_FOR_GLOBAL_KTABLE,
Consumed.with(null /*Uses default serde provided in config which is SpecificAvroSerde*/,
Serdes.ByteArray()),
Materialized.`as`(STORE_NAME))
val kafkaStreams = KafkaStreams(streamsBuilder.build(), getProperties(), DefaultKafkaClientSupplier())
kafkaStreams.start()
}
private fun getProperties(): Properties {
val properties = Properties()
properties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "kafka-brokers-ip"
properties[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "schema-registry-ip"
properties[StreamsConfig.APPLICATION_ID_CONFIG] = "app-id"
properties[ConsumerConfig.CLIENT_ID_CONFIG] = "client-id"
properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
properties[StreamsConfig.STATE_DIR_CONFIG] = "C:/dev/kafka-streams"
properties[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = SpecificAvroSerde::class.java
properties[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = SpecificAvroSerde::class.java
return properties
}
И это функция, которую яиспользуйте для запроса хранилища:
val store = kafkaStreams
.store(STORE_NAME, QueryableStoreTypes.keyValueStore<Key, ByteArray>())
store.all().forEach { keyValue ->
if (keyValue.key == Key("25-OCT-19 USD")) {
//we get here, so the key is in the store
println("Keys are equal.")
println(keyValue) // this gets printed: KeyValue({"value": "25-OCT-19 USD"}, [B@3ca343b5)
println(store.get(Key("25-OCT-19 USD"))) //this is null
println(store.get(keyValue.key)) //this is also null
}
}
Как видно из комментариев кода, в хранилище есть значения, но когда я пытаюсь выполнить запрос по определенному ключу, я ничего не получаю обратно.