У меня есть следующая топология:
- Создает государственный магазин
- Фильтрация записей на основе SOME_CONDITION, сопоставление его значений с новой сущностью и, наконец, публикация этих записей в другой теме STATIONS_LOW_CAPACITY_TOPIC
Однако я вижу это на STATIONS_LOW_CAPACITY_TOPIC:
� null
� null
� null
� {"id":140,"latitude":"40.4592351","longitude":"-3.6915330",...}
� {"id":137,"latitude":"40.4591366","longitude":"-3.6894151",...}
� null
То есть, как будто он также публиковал в теме STATIONS_LOW_CAPACITY_TOPIC те записи, которые не прошли фильтр. Как это возможно? Как я могу предотвратить их публикацию?
Это код ksteams:
kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, Station, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.filter { _, value -> SOME_CONDITION }
.mapValues { station ->
Stats(XXX)
}
.toStream().to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))
ОБНОВЛЕНИЕ: Я просто указал топологию и распечатал полученную таблицу. По какой-то причине последний KTable также содержит записи с нулевым значением, соответствующие вышестоящим записям, которые не прошли фильтр:
kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, BiciMadStation, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.filter { _, value ->
val conditionResult = (SOME_CONDITION)
println(conditionResult)
conditionResult
}
.print()
Журналы:
false
[KTABLE-FILTER-0000000002]: 1, (null<-null)
false
[KTABLE-FILTER-0000000002]: 2, (null<-null)
false
[KTABLE-FILTER-0000000002]: 3, (null<-null)
false
[KTABLE-FILTER-0000000002]: 4, (null<-null)
true
[KTABLE-FILTER-0000000002]: 5, (Station(id=5, latitude=40.4285524, longitude=-3.7025875, ...)<-null)