Я хочу использовать Java KStream в Kafka, чтобы отфильтровать все те значения, которые превышают определенное значение.Значения обмениваются как JSON, например:
ConsumerRecord(topic=u'test', partition=0, offset=1109, timestamp=1528110096230L, timestamp_type=0, key=None, value='{"device":"Internal","sensor":"Phone Microphone","value":"72.1"}', checksum=None, serialized_key_size=-1, serialized_value_size=64)
Я хочу отфильтровать значения ниже 20,0 (в приведенном выше случае значение равно 72,1, и это нормально)
public class WordCountExample {
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception{
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Filter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "andrewnetwork.ddns.net:9095");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("test");
source = source
.filterNot((k,v) -> {
if(isParsableAsDouble(v) && Double.parseDouble(v) <= 50.0)
return true;
else return false;
});
source.to("mem");
Фильтрация не происходит, и я не знаю почему.Есть идеи?