правильно удалить запись из Kafka Ktable - PullRequest
0 голосов
/ 25 января 2019

Я хочу правильно удалить объект из KTable.

У меня есть следующие данные потоковой обработки:

input
.map(this::deserialize)
.filter(((key, event) ->
    Arrays.asList(
        "AssessmentInitialized",
        "AssessmentRenamed",
        "AssessmentDeleted"
    ).contains(event.eventType())))
.map( ... do mapping  ... )
.groupBy((key, event) -> key, Serialized.with(Serdes.String(), domainEventSerde))
.aggregate(
    Assessment::new,
    (key, event, assessment) -> assessment.handleEvent(event),
    Materialized.<String, Assessment, KeyValueStore<Bytes, byte[]>>as(ASSESSMENTS_STORE)
        .withKeySerde(Serdes.String())
        .withValueSerde(assessmentSerde)
);

и assessment.handleEvent(event) возвращает null при обработке события AssessmentDeleted. Serde - this.assessmentSerde = new JsonSerde<>(Assessment.class, objectMapper);, где mapper - по умолчанию bean-компонент com.fasterxml.jackson.databind.ObjectMapper.

После того, как событие I обработано этим потоком, я вижу в Kafka KTable список изменений следующее значение:

{
"topic": "events-consumer-v1-assessments-store-changelog",
"key": "5d6d7a70-c907-460f-ac88-69705f079939",
"value": "ée",
"partition": 0,
"offset": 2
}

И это не похоже на то, что я хочу иметь. Это правильный способ удаления объекта? Я ожидал, что если я добавлю значение NULL в качестве значения агрегирующей операции, оно будет удалено, но похоже, что осталось немного мусора, и я не уверен, что проблема в его отображении, неправильный способ удаления или правильное поведение KTable.

1 Ответ

0 голосов
/ 25 января 2019

В вашем случае кажется, что проблема была в инструменте проверки.По какой-то причине он десериализует значение null не правильно.Всегда полезно использовать инструменты Kafka, чтобы сначала проверить его (kafka-console-consumer.sh, kafka-console-producer.sh).

...