Kafka Streams агрегирует данные IoT с отслеживанием состояния - PullRequest
1 голос
/ 01 октября 2019

У меня есть входящие данные для автомобилей и я хочу создать новое событие для поездок. У меня есть следующее сообщение json из темы транспортного средства:

{ asset_id: ‘123’, location: [11.00, 12.00], journey_start: true }
{ asset_id: ‘123’, location: [12.00, 12.00] }
{ asset_id: ‘123’, location: [13.00, 12.00], journey_end: true }

От начала события путешествия до конца события путешествия он должен собирать местоположения, и если путешествие закончилось, он должен отправлять его в путешествие. тема мероприятия. В предыдущем примере сообщение должно выглядеть так:

{ asset_id: ‘123’, locations: [[11.00, 12.00], [12.00, 12.00], [13.00, 12.00]] }

Как только наступит новое событие начала поездки, оно должно отбросить предыдущее событие поездки из локального магазина.

Это то, что я имею до сих пор,Я не знаю, как выполнить агрегацию, добавить условную отправку в тему события поездки и удалить старые данные о местоположении:

public class Stream {
    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, JsonNode> vehicleEvent = builder.stream(
            "vehicle-event",
            Consumed.with(
                Serdes.String(),
                jsonSerde
            )
        );

        KStream<String, JsonNode> journeyEvent = vehicleEvent
            .groupBy((key, value) -> {
                JSONObject jsonObj = new JSONObject(value);
                return jsonObj.getString("asset_id");
            })
            .aggregate()


    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, config);
    streams.start();
}

}

...