Фильтрация значений за порог с помощью KStream - PullRequest
0 голосов
/ 04 июня 2018

Я хочу использовать Java KStream в Kafka, чтобы отфильтровать все те значения, которые превышают определенное значение.Значения обмениваются как JSON, например:

ConsumerRecord(topic=u'test', partition=0, offset=1109, timestamp=1528110096230L, timestamp_type=0, key=None, value='{"device":"Internal","sensor":"Phone Microphone","value":"72.1"}', checksum=None, serialized_key_size=-1, serialized_value_size=64)

Я хочу отфильтровать значения ниже 20,0 (в приведенном выше случае значение равно 72,1, и это нормально)

public class WordCountExample {

@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception{

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Filter");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "andrewnetwork.ddns.net:9095");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> source = builder.stream("test");

    source =  source
    .filterNot((k,v) -> {
        if(isParsableAsDouble(v) && Double.parseDouble(v) <= 50.0)
            return true;
        else return false;
    });

    source.to("mem");

Фильтрация не происходит, и я не знаю почему.Есть идеи?

1 Ответ

0 голосов
/ 04 июня 2018

Помещая isParsableAsDouble(v) в filterNot, вы отфильтровываете все, потому что JSON не разбирается как двойное число.Я полагаю, что вы неправильно понимаете значения Кафки и поле value в JSON, которое не извлекается автоматически.

Вам понадобится десериализатор JSON. Например

    final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
    final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);


    KStreamBuilder builder = new KStreamBuilder();
    Properties props = new Properties();
    // load props 

    KStream<Bytes, JsonNode> source = builder.stream(Serdes.BytesSerde(), jsonSerde, "test")
        .filter((k, v) -> {
            return v.get("value").asDouble() > 20.0;
         });
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...