У меня есть следующая конфигурация потока kafka.
StreamBuilder builder = stream("TopicA", Serdes.String(), new
SpecificAvroSerde<TestObject>())
.filter((key, value) -> value!=null)
.selectKey((key, value) -> value.getSomeProperty())
.groupByKey(Grouped.with(Serdes.Long(), new
SpecificAvroSerde<TestObject>()))
.reduce((oldValue, newValue) -> newValue),
Materialized.as("someStore"));
Это работает, как я ожидаю, но я не могу понять, как я могу обработать сообщение Tombstone для TestObject, даже если я удалю
.filter((key, value) -> value!=null)
Я не могу понять, как я могу иметь дело с 'selectKey', в то время как когда значение приходит как ноль, я не могу отправить сообщение-захоронение с 'value.getSomeProperty ()', в то время как значение будет также нулевым ..
Как бы вы справились с этой проблемой?