Я новичок в потоках Kafka, и я попытался перебрать элементы в таблице Streams kafka через keyValueStore:
Идея состоит в том, чтобы создать Ktable (я также пробовал с globalKTable) сKeyValueStore.Затем отдельный поток отвечает за чтение содержимого KeyValueStore, чтобы выполнить итерацию по последнему значению каждого ключа.
val streamProperties: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getStringList("kafka.bootstrap.servers").toList.mkString(","))
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
p
}
val builder: StreamsBuilder = new StreamsBuilder()
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.state.KeyValueStore
val globalTable = builder.table("test",
Materialized
.as[String, Array[Byte], KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]]("TestStore")
.withCachingDisabled()
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.ByteArray())
)
val streams: KafkaStreams = new KafkaStreams(builder.build(), streamProperties)
streams.start()
val ex = new ScheduledThreadPoolExecutor(1)
val task = new Runnable {
def run() = {
println("\n\n\n tick \n\n\n")
try {
val keyValueStore = streams.store(globalTable.queryableStoreName(), QueryableStoreTypes.keyValueStore())
keyValueStore.all().toIterator.map { k =>
print(k.key)
}
} catch {
case _ => println("error")
}
}
}
val f = ex.scheduleAtFixedRate(task, 1, 10, TimeUnit.SECONDS)
}
}
В потоке keyValueStore остается пустым, даже когда я создаю элементы по теме "test".
Что-то я пропустил или не понял?