Я пытаюсь сделать простой stream.leftJoin(table)
, но получаю следующее исключение во время выполнения:
TopologyException: Invalid topology: StateStore null is not added yet
Вот так примерно выглядит мой код, я прокомментировал детали реализации, чтобы он был коротким:
val streamsConfiguration: Properties = {
val p = new Properties()
// api config
p.put(StreamsConfig.APPLICATION_ID_CONFIG /**/)
p.put(StreamsConfig.CLIENT_ID_CONFIG /**/)
// kafka broker
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// local state store
p.put(StreamsConfig.STATE_DIR_CONFIG, "./streams-state")
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// serdes
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, classOf[StringSerde])
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, classOf[StringSerde])
p
}
val builder = new StreamsBuilderS()
val rawInfoTable: KTableS[String, String] = builder.table("station_info")
val infoTable: KTableS[String, StationInfo] = rawInfoTable.mapValues{jsonString =>
/** */
}.filter(/** */)
.mapValues((/** */)
val rawStatusStream: KStreamS[String, String] = builder.stream("station_status")
val statusStream: KStreamS[String, StationStatus] = rawStatusStream.flatMapValues{jsonString =>
/** */
}
val outputStream: KStreamS[String, String] = statusStream
.leftJoin(infoTable, calculateStats)
.filter((_, availability) => {
/** */
})
.map((stationId: String, availability) => {
/** */
})
outputStream.to("low_availability")
val streams = new KafkaStreams(builder.build(), streamsConfiguration)
streams.cleanUp()
streams.start()
Я даже пытался вручную добавить StateStore
с помощью:
val store = Stores.inMemoryKeyValueStore("my-store")
val storeBuilder = Stores.keyValueStoreBuilder(store, new StringSerde(), new StringSerde())
val builder = new StreamsBuilderS()
builder.addStateStore(storeBuilder)
Но, похоже, это ничего не меняет. Я использую упаковщик потоков kafka от lightbend: "com.lightbend" %% "kafka-streams-scala" % "0.2.1"
Все примеры, которые я проверял, похоже, не заботятся о добавлении хранилища состояний, поэтому я несколько растерялся. Может ли кто-нибудь указать мне правильное направление? Это должно что-то делать STATE_DIR_CONFIG
? Или с кластером Кафка я работаю локально?