Как делегировать событие-захоронение в таблицу KTable, используя selectKey - PullRequest
0 голосов
/ 04 марта 2019

У меня есть следующая конфигурация потока kafka.

StreamBuilder builder = stream("TopicA", Serdes.String(), new 
        SpecificAvroSerde<TestObject>())
    .filter((key, value) -> value!=null)
    .selectKey((key, value) -> value.getSomeProperty())
    .groupByKey(Grouped.with(Serdes.Long(), new 
        SpecificAvroSerde<TestObject>()))
    .reduce((oldValue, newValue) -> newValue), 
        Materialized.as("someStore"));

Это работает, как я ожидаю, но я не могу понять, как я могу обработать сообщение Tombstone для TestObject, даже если я удалю

.filter((key, value) -> value!=null)

Я не могу понять, как я могу иметь дело с 'selectKey', в то время как когда значение приходит как ноль, я не могу отправить сообщение-захоронение с 'value.getSomeProperty ()', в то время как значение будет также нулевым ..

Как бы вы справились с этой проблемой?

1 Ответ

0 голосов
/ 05 марта 2019

Вы можете использовать transform() вместо selectKey() и сохранить старую пару <key,value> в государственном хранилище.Таким образом, при обработке <key,null> вы можете получить предыдущее значение из хранилища, а также получить ранее извлеченный новый ключ и отправить соответствующее захоронение.1007 * ключ или null значение (они будут отброшены).Таким образом, вам нужно будет использовать суррогатное значение вместо null, чтобы получить запись в функцию Reduce.Если суррогат получен, Reduce может вернуть null.

...