Почему записи топологии KStream / KTable не проходят фильтр? - PullRequest
0 голосов
/ 30 августа 2018

У меня есть следующая топология:

  1. Создает государственный магазин
  2. Фильтрация записей на основе SOME_CONDITION, сопоставление его значений с новой сущностью и, наконец, публикация этих записей в другой теме STATIONS_LOW_CAPACITY_TOPIC

Однако я вижу это на STATIONS_LOW_CAPACITY_TOPIC:

�   null
�   null
�   null
�   {"id":140,"latitude":"40.4592351","longitude":"-3.6915330",...}
�   {"id":137,"latitude":"40.4591366","longitude":"-3.6894151",...}
�   null

То есть, как будто он также публиковал в теме STATIONS_LOW_CAPACITY_TOPIC те записи, которые не прошли фильтр. Как это возможно? Как я могу предотвратить их публикацию?

Это код ksteams:

kStream.groupByKey().reduce({ _, newValue -> newValue },
                Materialized.`as`<Int, Station, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(stationSerde))
                .filter { _, value -> SOME_CONDITION }
                .mapValues { station ->
                    Stats(XXX)
                }
                .toStream().to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))

ОБНОВЛЕНИЕ: Я просто указал топологию и распечатал полученную таблицу. По какой-то причине последний KTable также содержит записи с нулевым значением, соответствующие вышестоящим записям, которые не прошли фильтр:

kStream.groupByKey().reduce({ _, newValue -> newValue },
                Materialized.`as`<Int, BiciMadStation, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(stationSerde))
                .filter { _, value ->
                    val conditionResult = (SOME_CONDITION)
                    println(conditionResult)
                    conditionResult
                }
                .print()

Журналы:

false
[KTABLE-FILTER-0000000002]: 1, (null<-null)
false
[KTABLE-FILTER-0000000002]: 2, (null<-null)
false
[KTABLE-FILTER-0000000002]: 3, (null<-null)
false
[KTABLE-FILTER-0000000002]: 4, (null<-null)
true
[KTABLE-FILTER-0000000002]: 5, (Station(id=5, latitude=40.4285524, longitude=-3.7025875, ...)<-null)

1 Ответ

0 голосов
/ 30 августа 2018

Ответ был в javadoc KTable.filter(...):

Обратите внимание, что фильтр для потока изменений работает иначе, чем запись потоковые фильтры, потому что записи с нулевыми значениями (так называемый надгробный камень записи) имеют семантику удаления. Таким образом, для надгробий предусмотрены Предикат фильтра не оценивается, но запись захоронения переадресация напрямую, если требуется (т. е. если есть что удален). Кроме того, для каждой записи, которая удаляется (то есть точка не удовлетворяет данному предикату) пересылается запись надгробия.

Это объясняет, почему я вижу записи с нулевым значением (надгробная плита), отправленные вниз по течению.

Чтобы избежать этого, я преобразовал KTable в KStream, а затем применил фильтр:

kStream.groupByKey().reduce({ _, newValue -> newValue },
                Materialized.`as`<Int, Stations, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
                        .withKeySerde(Serdes.Integer())
                        .withValueSerde(stationSerde))
                .toStream()
                .filter { _, value -> SOME_CONDITION }
                .mapValues { station ->
                    StationStats(station.id, station.latitude, station.longitude, ...)
                }
                .to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))

Результат:

4   {"id":4,"latitude":"40.4302937","longitude":"-3.7069171",...}
5   {"id":5,"latitude":"40.4285524","longitude":"-3.7025875",...}
...
...