У меня есть KStream<String, X>
, который я, по сути, хочу преобразовать в KTable<String, Y>
. Единственный способ, которым я мог бы добиться этого, используя DSL, - это карта, группа затем уменьшается.
val stream: KStream<String, X> = ...
val table: KTable<String, Y> = stream
.mapValues({ value -> toYOrNull(value)})
.groupByKey(Grouped.with(Serdes.String(), ySerde))
.reduce(
{old: Y?, updated: Y? -> updated},
Materialized.`as`<String, Y, KeyValueStore<Bytes, ByteArray>>("y-store")
.withKeySerde(Serdes.String()
.withValueSerde(ySerde)
)
Я ожидаю, что это обработает случай, когда значение updated
в reduce
равно null
, однако, когда я проверяю хранилище с помощью TopologyTestDriver
, оно все еще имеет старую версию. Что я делаю не так?
Это мой тест:
@Test
fun shouldDeleteFromTableWhenNull() {
val store = testDriver.getKeyValueStore<String, Y?>("y-store")
store.put("key", Y())
inputTopic.pipeInput("key", anXThatMapsToANullY)
assertThat(store.get("key")).isNull() // Fails as the old entry is still there
}