Надгробные сообщения не удаляют запись из хранилища состояний KTable? - PullRequest
0 голосов
/ 05 июня 2018

Я создаю данные обработки KTable из KStream.Но когда я запускаю сообщения-захоронения с ключом и нулевой полезной нагрузкой, это не удаляет сообщение из KTable.

sample -

public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
                .map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
                .groupByKey()
                reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));


GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);

После запуска сообщения с нулевым значением я могу выполнить отладку вФункция отображения testStream с нулевой полезной нагрузкой, но не удаляет запись в журнале изменений KTable «test-store».Похоже, что он даже не достигает метода уменьшения, не уверен, что мне здесь не хватает.

Ценю любую помощь по этому вопросу!

Спасибо.

1 Ответ

0 голосов
/ 06 июня 2018

Как указано в JavaDocs reduce()

Записи с ключом или значением {@code null} игнорируются.

Поскольку запись <key,null>удаляется и, таким образом, (genericRecord, v1) -> v1 никогда не выполняется, надгробие не записывается в тему хранилища или журнала изменений.

Для случая использования, который вы имеете в виду, необходимо использовать суррогатное значение, которое указывает «удалить»,например, логический флаг в вашей записи Avro.Ваша функция Reduce должна проверить флаг и вернуть null, если флаг установлен;в противном случае он должен регулярно обрабатывать запись.

...